Whamcloud - gitweb
LU-9679 general: avoid bare return; at end of void function
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90                        obd, obd->obd_namespace, obd->obd_force);
91                 LBUG();
92         }
93         lu_ref_fini(&obd->obd_reference);
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96
97 struct obd_type *class_search_type(const char *name)
98 {
99         struct kobject *kobj = kset_find_obj(lustre_kset, name);
100
101         if (kobj && kobj->ktype == &class_ktype)
102                 return container_of(kobj, struct obd_type, typ_kobj);
103
104         kobject_put(kobj);
105         return NULL;
106 }
107 EXPORT_SYMBOL(class_search_type);
108
109 struct obd_type *class_get_type(const char *name)
110 {
111         struct obd_type *type;
112
113         type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
115         if (!type) {
116                 const char *modname = name;
117
118 #ifdef HAVE_SERVER_SUPPORT
119                 if (strcmp(modname, "obdfilter") == 0)
120                         modname = "ofd";
121
122                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123                         modname = LUSTRE_OSP_NAME;
124
125                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126                         modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 spin_lock(&type->obd_type_lock);
140                 type->typ_refcnt++;
141                 try_module_get(type->typ_dt_ops->o_owner);
142                 spin_unlock(&type->obd_type_lock);
143                 /* class_search_type() returned a counted reference,
144                  * but we don't need that count any more as
145                  * we have one through typ_refcnt.
146                  */
147                 kobject_put(&type->typ_kobj);
148         }
149         return type;
150 }
151
152 void class_put_type(struct obd_type *type)
153 {
154         LASSERT(type);
155         spin_lock(&type->obd_type_lock);
156         type->typ_refcnt--;
157         module_put(type->typ_dt_ops->o_owner);
158         spin_unlock(&type->obd_type_lock);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165         debugfs_remove_recursive(type->typ_debugfs_entry);
166         type->typ_debugfs_entry = NULL;
167
168         if (type->typ_lu)
169                 lu_device_type_fini(type->typ_lu);
170
171 #ifdef CONFIG_PROC_FS
172         if (type->typ_name && type->typ_procroot)
173                 remove_proc_subtree(type->typ_name, proc_lustre_root);
174 #endif
175         if (type->typ_md_ops)
176                 OBD_FREE_PTR(type->typ_md_ops);
177         if (type->typ_dt_ops)
178                 OBD_FREE_PTR(type->typ_dt_ops);
179
180         OBD_FREE(type, sizeof(*type));
181 }
182
183 static struct kobj_type class_ktype = {
184         .sysfs_ops      = &lustre_sysfs_ops,
185         .release        = class_sysfs_release,
186 };
187
188 #ifdef HAVE_SERVER_SUPPORT
189 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
190 {
191         struct dentry *symlink;
192         struct obd_type *type;
193         int rc;
194
195         type = class_search_type(name);
196         if (type) {
197                 kobject_put(&type->typ_kobj);
198                 return ERR_PTR(-EEXIST);
199         }
200
201         OBD_ALLOC(type, sizeof(*type));
202         if (!type)
203                 return ERR_PTR(-ENOMEM);
204
205         type->typ_kobj.kset = lustre_kset;
206         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
207                                   &lustre_kset->kobj, "%s", name);
208         if (rc)
209                 return ERR_PTR(rc);
210
211         symlink = debugfs_create_dir(name, debugfs_lustre_root);
212         if (IS_ERR_OR_NULL(symlink)) {
213                 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
214                 kobject_put(&type->typ_kobj);
215                 return ERR_PTR(rc);
216         }
217         type->typ_debugfs_entry = symlink;
218         type->typ_sym_filter = true;
219
220         if (enable_proc) {
221                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
222                                                       NULL, NULL);
223                 if (IS_ERR(type->typ_procroot)) {
224                         CERROR("%s: can't create compat proc entry: %d\n",
225                                name, (int)PTR_ERR(type->typ_procroot));
226                         type->typ_procroot = NULL;
227                 }
228         }
229
230         return type;
231 }
232 EXPORT_SYMBOL(class_add_symlinks);
233 #endif /* HAVE_SERVER_SUPPORT */
234
235 #define CLASS_MAX_NAME 1024
236
237 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
238                         bool enable_proc, struct lprocfs_vars *vars,
239                         const char *name, struct lu_device_type *ldt)
240 {
241         struct obd_type *type;
242         int rc;
243
244         ENTRY;
245         /* sanity check */
246         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
247
248         type = class_search_type(name);
249         if (type) {
250 #ifdef HAVE_SERVER_SUPPORT
251                 if (type->typ_sym_filter)
252                         goto dir_exist;
253 #endif /* HAVE_SERVER_SUPPORT */
254                 kobject_put(&type->typ_kobj);
255                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
256                 RETURN(-EEXIST);
257         }
258
259         OBD_ALLOC(type, sizeof(*type));
260         if (type == NULL)
261                 RETURN(-ENOMEM);
262
263         type->typ_kobj.kset = lustre_kset;
264         kobject_init(&type->typ_kobj, &class_ktype);
265 #ifdef HAVE_SERVER_SUPPORT
266 dir_exist:
267 #endif /* HAVE_SERVER_SUPPORT */
268         OBD_ALLOC_PTR(type->typ_dt_ops);
269         OBD_ALLOC_PTR(type->typ_md_ops);
270
271         if (type->typ_dt_ops == NULL ||
272             type->typ_md_ops == NULL)
273                 GOTO (failed, rc = -ENOMEM);
274
275         *(type->typ_dt_ops) = *dt_ops;
276         /* md_ops is optional */
277         if (md_ops)
278                 *(type->typ_md_ops) = *md_ops;
279         spin_lock_init(&type->obd_type_lock);
280
281 #ifdef HAVE_SERVER_SUPPORT
282         if (type->typ_sym_filter) {
283                 type->typ_sym_filter = false;
284                 kobject_put(&type->typ_kobj);
285                 goto setup_ldt;
286         }
287 #endif
288 #ifdef CONFIG_PROC_FS
289         if (enable_proc && !type->typ_procroot) {
290                 type->typ_procroot = lprocfs_register(name,
291                                                       proc_lustre_root,
292                                                       NULL, type);
293                 if (IS_ERR(type->typ_procroot)) {
294                         rc = PTR_ERR(type->typ_procroot);
295                         type->typ_procroot = NULL;
296                         GOTO(failed, rc);
297                 }
298         }
299 #endif
300         type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
301                                                     vars, type);
302         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
303                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
304                                              : -ENOMEM;
305                 type->typ_debugfs_entry = NULL;
306                 GOTO(failed, rc);
307         }
308
309         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
310         if (rc)
311                 GOTO(failed, rc);
312 #ifdef HAVE_SERVER_SUPPORT
313 setup_ldt:
314 #endif
315         if (ldt) {
316                 type->typ_lu = ldt;
317                 rc = lu_device_type_init(ldt);
318                 if (rc)
319                         GOTO(failed, rc);
320         }
321
322         RETURN(0);
323
324 failed:
325         kobject_put(&type->typ_kobj);
326
327         RETURN(rc);
328 }
329 EXPORT_SYMBOL(class_register_type);
330
331 int class_unregister_type(const char *name)
332 {
333         struct obd_type *type = class_search_type(name);
334         int rc = 0;
335         ENTRY;
336
337         if (!type) {
338                 CERROR("unknown obd type\n");
339                 RETURN(-EINVAL);
340         }
341
342         if (type->typ_refcnt) {
343                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
344                 /* This is a bad situation, let's make the best of it */
345                 /* Remove ops, but leave the name for debugging */
346                 OBD_FREE_PTR(type->typ_dt_ops);
347                 OBD_FREE_PTR(type->typ_md_ops);
348                 GOTO(out_put, rc = -EBUSY);
349         }
350
351         /* Put the final ref */
352         kobject_put(&type->typ_kobj);
353 out_put:
354         /* Put the ref returned by class_search_type() */
355         kobject_put(&type->typ_kobj);
356
357         RETURN(rc);
358 } /* class_unregister_type */
359 EXPORT_SYMBOL(class_unregister_type);
360
361 /**
362  * Create a new obd device.
363  *
364  * Allocate the new obd_device and initialize it.
365  *
366  * \param[in] type_name obd device type string.
367  * \param[in] name      obd device name.
368  * \param[in] uuid      obd device UUID
369  *
370  * \retval newdev         pointer to created obd_device
371  * \retval ERR_PTR(errno) on error
372  */
373 struct obd_device *class_newdev(const char *type_name, const char *name,
374                                 const char *uuid)
375 {
376         struct obd_device *newdev;
377         struct obd_type *type = NULL;
378         ENTRY;
379
380         if (strlen(name) >= MAX_OBD_NAME) {
381                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
382                 RETURN(ERR_PTR(-EINVAL));
383         }
384
385         type = class_get_type(type_name);
386         if (type == NULL){
387                 CERROR("OBD: unknown type: %s\n", type_name);
388                 RETURN(ERR_PTR(-ENODEV));
389         }
390
391         newdev = obd_device_alloc();
392         if (newdev == NULL) {
393                 class_put_type(type);
394                 RETURN(ERR_PTR(-ENOMEM));
395         }
396         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
397         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
398         newdev->obd_type = type;
399         newdev->obd_minor = -1;
400
401         rwlock_init(&newdev->obd_pool_lock);
402         newdev->obd_pool_limit = 0;
403         newdev->obd_pool_slv = 0;
404
405         INIT_LIST_HEAD(&newdev->obd_exports);
406         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
407         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
408         INIT_LIST_HEAD(&newdev->obd_exports_timed);
409         INIT_LIST_HEAD(&newdev->obd_nid_stats);
410         spin_lock_init(&newdev->obd_nid_lock);
411         spin_lock_init(&newdev->obd_dev_lock);
412         mutex_init(&newdev->obd_dev_mutex);
413         spin_lock_init(&newdev->obd_osfs_lock);
414         /* newdev->obd_osfs_age must be set to a value in the distant
415          * past to guarantee a fresh statfs is fetched on mount. */
416         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
417
418         /* XXX belongs in setup not attach  */
419         init_rwsem(&newdev->obd_observer_link_sem);
420         /* recovery data */
421         spin_lock_init(&newdev->obd_recovery_task_lock);
422         init_waitqueue_head(&newdev->obd_next_transno_waitq);
423         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
424         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
425         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
426         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
427         INIT_LIST_HEAD(&newdev->obd_evict_list);
428         INIT_LIST_HEAD(&newdev->obd_lwp_list);
429
430         llog_group_init(&newdev->obd_olg);
431         /* Detach drops this */
432         atomic_set(&newdev->obd_refcount, 1);
433         lu_ref_init(&newdev->obd_reference);
434         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
435
436         newdev->obd_conn_inprogress = 0;
437
438         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
439
440         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
441                newdev->obd_name, newdev);
442
443         return newdev;
444 }
445
446 /**
447  * Free obd device.
448  *
449  * \param[in] obd obd_device to be freed
450  *
451  * \retval none
452  */
453 void class_free_dev(struct obd_device *obd)
454 {
455         struct obd_type *obd_type = obd->obd_type;
456
457         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
458                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
459         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
460                  "obd %p != obd_devs[%d] %p\n",
461                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
462         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
463                  "obd_refcount should be 0, not %d\n",
464                  atomic_read(&obd->obd_refcount));
465         LASSERT(obd_type != NULL);
466
467         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
468                obd->obd_name, obd->obd_type->typ_name);
469
470         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
471                          obd->obd_name, obd->obd_uuid.uuid);
472         if (obd->obd_stopping) {
473                 int err;
474
475                 /* If we're not stopping, we were never set up */
476                 err = obd_cleanup(obd);
477                 if (err)
478                         CERROR("Cleanup %s returned %d\n",
479                                 obd->obd_name, err);
480         }
481
482         obd_device_free(obd);
483
484         class_put_type(obd_type);
485 }
486
487 /**
488  * Unregister obd device.
489  *
490  * Free slot in obd_dev[] used by \a obd.
491  *
492  * \param[in] new_obd obd_device to be unregistered
493  *
494  * \retval none
495  */
496 void class_unregister_device(struct obd_device *obd)
497 {
498         write_lock(&obd_dev_lock);
499         if (obd->obd_minor >= 0) {
500                 LASSERT(obd_devs[obd->obd_minor] == obd);
501                 obd_devs[obd->obd_minor] = NULL;
502                 obd->obd_minor = -1;
503         }
504         write_unlock(&obd_dev_lock);
505 }
506
507 /**
508  * Register obd device.
509  *
510  * Find free slot in obd_devs[], fills it with \a new_obd.
511  *
512  * \param[in] new_obd obd_device to be registered
513  *
514  * \retval 0          success
515  * \retval -EEXIST    device with this name is registered
516  * \retval -EOVERFLOW obd_devs[] is full
517  */
518 int class_register_device(struct obd_device *new_obd)
519 {
520         int ret = 0;
521         int i;
522         int new_obd_minor = 0;
523         bool minor_assign = false;
524         bool retried = false;
525
526 again:
527         write_lock(&obd_dev_lock);
528         for (i = 0; i < class_devno_max(); i++) {
529                 struct obd_device *obd = class_num2obd(i);
530
531                 if (obd != NULL &&
532                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
533
534                         if (!retried) {
535                                 write_unlock(&obd_dev_lock);
536
537                                 /* the obd_device could be waited to be
538                                  * destroyed by the "obd_zombie_impexp_thread".
539                                  */
540                                 obd_zombie_barrier();
541                                 retried = true;
542                                 goto again;
543                         }
544
545                         CERROR("%s: already exists, won't add\n",
546                                obd->obd_name);
547                         /* in case we found a free slot before duplicate */
548                         minor_assign = false;
549                         ret = -EEXIST;
550                         break;
551                 }
552                 if (!minor_assign && obd == NULL) {
553                         new_obd_minor = i;
554                         minor_assign = true;
555                 }
556         }
557
558         if (minor_assign) {
559                 new_obd->obd_minor = new_obd_minor;
560                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
561                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
562                 obd_devs[new_obd_minor] = new_obd;
563         } else {
564                 if (ret == 0) {
565                         ret = -EOVERFLOW;
566                         CERROR("%s: all %u/%u devices used, increase "
567                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
568                                i, class_devno_max(), ret);
569                 }
570         }
571         write_unlock(&obd_dev_lock);
572
573         RETURN(ret);
574 }
575
576 static int class_name2dev_nolock(const char *name)
577 {
578         int i;
579
580         if (!name)
581                 return -1;
582
583         for (i = 0; i < class_devno_max(); i++) {
584                 struct obd_device *obd = class_num2obd(i);
585
586                 if (obd && strcmp(name, obd->obd_name) == 0) {
587                         /* Make sure we finished attaching before we give
588                            out any references */
589                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
590                         if (obd->obd_attached) {
591                                 return i;
592                         }
593                         break;
594                 }
595         }
596
597         return -1;
598 }
599
600 int class_name2dev(const char *name)
601 {
602         int i;
603
604         if (!name)
605                 return -1;
606
607         read_lock(&obd_dev_lock);
608         i = class_name2dev_nolock(name);
609         read_unlock(&obd_dev_lock);
610
611         return i;
612 }
613 EXPORT_SYMBOL(class_name2dev);
614
615 struct obd_device *class_name2obd(const char *name)
616 {
617         int dev = class_name2dev(name);
618
619         if (dev < 0 || dev > class_devno_max())
620                 return NULL;
621         return class_num2obd(dev);
622 }
623 EXPORT_SYMBOL(class_name2obd);
624
625 int class_uuid2dev_nolock(struct obd_uuid *uuid)
626 {
627         int i;
628
629         for (i = 0; i < class_devno_max(); i++) {
630                 struct obd_device *obd = class_num2obd(i);
631
632                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
633                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
634                         return i;
635                 }
636         }
637
638         return -1;
639 }
640
641 int class_uuid2dev(struct obd_uuid *uuid)
642 {
643         int i;
644
645         read_lock(&obd_dev_lock);
646         i = class_uuid2dev_nolock(uuid);
647         read_unlock(&obd_dev_lock);
648
649         return i;
650 }
651 EXPORT_SYMBOL(class_uuid2dev);
652
653 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
654 {
655         int dev = class_uuid2dev(uuid);
656         if (dev < 0)
657                 return NULL;
658         return class_num2obd(dev);
659 }
660 EXPORT_SYMBOL(class_uuid2obd);
661
662 /**
663  * Get obd device from ::obd_devs[]
664  *
665  * \param num [in] array index
666  *
667  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
668  *         otherwise return the obd device there.
669  */
670 struct obd_device *class_num2obd(int num)
671 {
672         struct obd_device *obd = NULL;
673
674         if (num < class_devno_max()) {
675                 obd = obd_devs[num];
676                 if (obd == NULL)
677                         return NULL;
678
679                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
680                          "%p obd_magic %08x != %08x\n",
681                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
682                 LASSERTF(obd->obd_minor == num,
683                          "%p obd_minor %0d != %0d\n",
684                          obd, obd->obd_minor, num);
685         }
686
687         return obd;
688 }
689
690 /**
691  * Find obd in obd_dev[] by name or uuid.
692  *
693  * Increment obd's refcount if found.
694  *
695  * \param[in] str obd name or uuid
696  *
697  * \retval NULL    if not found
698  * \retval target  pointer to found obd_device
699  */
700 struct obd_device *class_dev_by_str(const char *str)
701 {
702         struct obd_device *target = NULL;
703         struct obd_uuid tgtuuid;
704         int rc;
705
706         obd_str2uuid(&tgtuuid, str);
707
708         read_lock(&obd_dev_lock);
709         rc = class_uuid2dev_nolock(&tgtuuid);
710         if (rc < 0)
711                 rc = class_name2dev_nolock(str);
712
713         if (rc >= 0)
714                 target = class_num2obd(rc);
715
716         if (target != NULL)
717                 class_incref(target, "find", current);
718         read_unlock(&obd_dev_lock);
719
720         RETURN(target);
721 }
722 EXPORT_SYMBOL(class_dev_by_str);
723
724 /**
725  * Get obd devices count. Device in any
726  *    state are counted
727  * \retval obd device count
728  */
729 int get_devices_count(void)
730 {
731         int index, max_index = class_devno_max(), dev_count = 0;
732
733         read_lock(&obd_dev_lock);
734         for (index = 0; index <= max_index; index++) {
735                 struct obd_device *obd = class_num2obd(index);
736                 if (obd != NULL)
737                         dev_count++;
738         }
739         read_unlock(&obd_dev_lock);
740
741         return dev_count;
742 }
743 EXPORT_SYMBOL(get_devices_count);
744
745 void class_obd_list(void)
746 {
747         char *status;
748         int i;
749
750         read_lock(&obd_dev_lock);
751         for (i = 0; i < class_devno_max(); i++) {
752                 struct obd_device *obd = class_num2obd(i);
753
754                 if (obd == NULL)
755                         continue;
756                 if (obd->obd_stopping)
757                         status = "ST";
758                 else if (obd->obd_set_up)
759                         status = "UP";
760                 else if (obd->obd_attached)
761                         status = "AT";
762                 else
763                         status = "--";
764                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
765                          i, status, obd->obd_type->typ_name,
766                          obd->obd_name, obd->obd_uuid.uuid,
767                          atomic_read(&obd->obd_refcount));
768         }
769         read_unlock(&obd_dev_lock);
770 }
771
772 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
773    specified, then only the client with that uuid is returned,
774    otherwise any client connected to the tgt is returned. */
775 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
776                                           const char *type_name,
777                                           struct obd_uuid *grp_uuid)
778 {
779         int i;
780
781         read_lock(&obd_dev_lock);
782         for (i = 0; i < class_devno_max(); i++) {
783                 struct obd_device *obd = class_num2obd(i);
784
785                 if (obd == NULL)
786                         continue;
787                 if ((strncmp(obd->obd_type->typ_name, type_name,
788                              strlen(type_name)) == 0)) {
789                         if (obd_uuid_equals(tgt_uuid,
790                                             &obd->u.cli.cl_target_uuid) &&
791                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
792                                                          &obd->obd_uuid) : 1)) {
793                                 read_unlock(&obd_dev_lock);
794                                 return obd;
795                         }
796                 }
797         }
798         read_unlock(&obd_dev_lock);
799
800         return NULL;
801 }
802 EXPORT_SYMBOL(class_find_client_obd);
803
804 /* Iterate the obd_device list looking devices have grp_uuid. Start
805    searching at *next, and if a device is found, the next index to look
806    at is saved in *next. If next is NULL, then the first matching device
807    will always be returned. */
808 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
809 {
810         int i;
811
812         if (next == NULL)
813                 i = 0;
814         else if (*next >= 0 && *next < class_devno_max())
815                 i = *next;
816         else
817                 return NULL;
818
819         read_lock(&obd_dev_lock);
820         for (; i < class_devno_max(); i++) {
821                 struct obd_device *obd = class_num2obd(i);
822
823                 if (obd == NULL)
824                         continue;
825                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
826                         if (next != NULL)
827                                 *next = i+1;
828                         read_unlock(&obd_dev_lock);
829                         return obd;
830                 }
831         }
832         read_unlock(&obd_dev_lock);
833
834         return NULL;
835 }
836 EXPORT_SYMBOL(class_devices_in_group);
837
838 /**
839  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
840  * adjust sptlrpc settings accordingly.
841  */
842 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
843 {
844         struct obd_device  *obd;
845         const char         *type;
846         int                 i, rc = 0, rc2;
847
848         LASSERT(namelen > 0);
849
850         read_lock(&obd_dev_lock);
851         for (i = 0; i < class_devno_max(); i++) {
852                 obd = class_num2obd(i);
853
854                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
855                         continue;
856
857                 /* only notify mdc, osc, osp, lwp, mdt, ost
858                  * because only these have a -sptlrpc llog */
859                 type = obd->obd_type->typ_name;
860                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
861                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
862                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
863                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
864                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
865                     strcmp(type, LUSTRE_OST_NAME) != 0)
866                         continue;
867
868                 if (strncmp(obd->obd_name, fsname, namelen))
869                         continue;
870
871                 class_incref(obd, __FUNCTION__, obd);
872                 read_unlock(&obd_dev_lock);
873                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
874                                          sizeof(KEY_SPTLRPC_CONF),
875                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
876                 rc = rc ? rc : rc2;
877                 class_decref(obd, __FUNCTION__, obd);
878                 read_lock(&obd_dev_lock);
879         }
880         read_unlock(&obd_dev_lock);
881         return rc;
882 }
883 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
884
885 void obd_cleanup_caches(void)
886 {
887         ENTRY;
888         if (obd_device_cachep) {
889                 kmem_cache_destroy(obd_device_cachep);
890                 obd_device_cachep = NULL;
891         }
892
893         EXIT;
894 }
895
896 int obd_init_caches(void)
897 {
898         int rc;
899         ENTRY;
900
901         LASSERT(obd_device_cachep == NULL);
902         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
903                                 sizeof(struct obd_device),
904                                 0, 0, 0, sizeof(struct obd_device), NULL);
905         if (!obd_device_cachep)
906                 GOTO(out, rc = -ENOMEM);
907
908         RETURN(0);
909 out:
910         obd_cleanup_caches();
911         RETURN(rc);
912 }
913
914 static struct portals_handle_ops export_handle_ops;
915
916 /* map connection to client */
917 struct obd_export *class_conn2export(struct lustre_handle *conn)
918 {
919         struct obd_export *export;
920         ENTRY;
921
922         if (!conn) {
923                 CDEBUG(D_CACHE, "looking for null handle\n");
924                 RETURN(NULL);
925         }
926
927         if (conn->cookie == -1) {  /* this means assign a new connection */
928                 CDEBUG(D_CACHE, "want a new connection\n");
929                 RETURN(NULL);
930         }
931
932         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
933         export = class_handle2object(conn->cookie, &export_handle_ops);
934         RETURN(export);
935 }
936 EXPORT_SYMBOL(class_conn2export);
937
938 struct obd_device *class_exp2obd(struct obd_export *exp)
939 {
940         if (exp)
941                 return exp->exp_obd;
942         return NULL;
943 }
944 EXPORT_SYMBOL(class_exp2obd);
945
946 struct obd_import *class_exp2cliimp(struct obd_export *exp)
947 {
948         struct obd_device *obd = exp->exp_obd;
949         if (obd == NULL)
950                 return NULL;
951         return obd->u.cli.cl_import;
952 }
953 EXPORT_SYMBOL(class_exp2cliimp);
954
955 /* Export management functions */
956 static void class_export_destroy(struct obd_export *exp)
957 {
958         struct obd_device *obd = exp->exp_obd;
959         ENTRY;
960
961         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
962         LASSERT(obd != NULL);
963
964         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
965                exp->exp_client_uuid.uuid, obd->obd_name);
966
967         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
968         if (exp->exp_connection)
969                 ptlrpc_put_connection_superhack(exp->exp_connection);
970
971         LASSERT(list_empty(&exp->exp_outstanding_replies));
972         LASSERT(list_empty(&exp->exp_uncommitted_replies));
973         LASSERT(list_empty(&exp->exp_req_replay_queue));
974         LASSERT(list_empty(&exp->exp_hp_rpcs));
975         obd_destroy_export(exp);
976         /* self export doesn't hold a reference to an obd, although it
977          * exists until freeing of the obd */
978         if (exp != obd->obd_self_export)
979                 class_decref(obd, "export", exp);
980
981         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
982         EXIT;
983 }
984
985 static void export_handle_addref(void *export)
986 {
987         class_export_get(export);
988 }
989
990 static struct portals_handle_ops export_handle_ops = {
991         .hop_addref = export_handle_addref,
992         .hop_free   = NULL,
993 };
994
995 struct obd_export *class_export_get(struct obd_export *exp)
996 {
997         atomic_inc(&exp->exp_refcount);
998         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
999                atomic_read(&exp->exp_refcount));
1000         return exp;
1001 }
1002 EXPORT_SYMBOL(class_export_get);
1003
1004 void class_export_put(struct obd_export *exp)
1005 {
1006         LASSERT(exp != NULL);
1007         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1008         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1009                atomic_read(&exp->exp_refcount) - 1);
1010
1011         if (atomic_dec_and_test(&exp->exp_refcount)) {
1012                 struct obd_device *obd = exp->exp_obd;
1013
1014                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1015                        exp, exp->exp_client_uuid.uuid);
1016
1017                 /* release nid stat refererence */
1018                 lprocfs_exp_cleanup(exp);
1019
1020                 if (exp == obd->obd_self_export) {
1021                         /* self export should be destroyed without
1022                          * zombie thread as it doesn't hold a
1023                          * reference to obd and doesn't hold any
1024                          * resources */
1025                         class_export_destroy(exp);
1026                         /* self export is destroyed, no class
1027                          * references exist and it is safe to free
1028                          * obd */
1029                         class_free_dev(obd);
1030                 } else {
1031                         LASSERT(!list_empty(&exp->exp_obd_chain));
1032                         obd_zombie_export_add(exp);
1033                 }
1034
1035         }
1036 }
1037 EXPORT_SYMBOL(class_export_put);
1038
1039 static void obd_zombie_exp_cull(struct work_struct *ws)
1040 {
1041         struct obd_export *export;
1042
1043         export = container_of(ws, struct obd_export, exp_zombie_work);
1044         class_export_destroy(export);
1045 }
1046
1047 /* Creates a new export, adds it to the hash table, and returns a
1048  * pointer to it. The refcount is 2: one for the hash reference, and
1049  * one for the pointer returned by this function. */
1050 struct obd_export *__class_new_export(struct obd_device *obd,
1051                                       struct obd_uuid *cluuid, bool is_self)
1052 {
1053         struct obd_export *export;
1054         struct cfs_hash *hash = NULL;
1055         int rc = 0;
1056         ENTRY;
1057
1058         OBD_ALLOC_PTR(export);
1059         if (!export)
1060                 return ERR_PTR(-ENOMEM);
1061
1062         export->exp_conn_cnt = 0;
1063         export->exp_lock_hash = NULL;
1064         export->exp_flock_hash = NULL;
1065         /* 2 = class_handle_hash + last */
1066         atomic_set(&export->exp_refcount, 2);
1067         atomic_set(&export->exp_rpc_count, 0);
1068         atomic_set(&export->exp_cb_count, 0);
1069         atomic_set(&export->exp_locks_count, 0);
1070 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1071         INIT_LIST_HEAD(&export->exp_locks_list);
1072         spin_lock_init(&export->exp_locks_list_guard);
1073 #endif
1074         atomic_set(&export->exp_replay_count, 0);
1075         export->exp_obd = obd;
1076         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1077         spin_lock_init(&export->exp_uncommitted_replies_lock);
1078         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1079         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1080         INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1081         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1082         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1083         class_handle_hash(&export->exp_handle, &export_handle_ops);
1084         export->exp_last_request_time = ktime_get_real_seconds();
1085         spin_lock_init(&export->exp_lock);
1086         spin_lock_init(&export->exp_rpc_lock);
1087         INIT_HLIST_NODE(&export->exp_uuid_hash);
1088         INIT_HLIST_NODE(&export->exp_nid_hash);
1089         INIT_HLIST_NODE(&export->exp_gen_hash);
1090         spin_lock_init(&export->exp_bl_list_lock);
1091         INIT_LIST_HEAD(&export->exp_bl_list);
1092         INIT_LIST_HEAD(&export->exp_stale_list);
1093         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1094
1095         export->exp_sp_peer = LUSTRE_SP_ANY;
1096         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1097         export->exp_client_uuid = *cluuid;
1098         obd_init_export(export);
1099
1100         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1101                 spin_lock(&obd->obd_dev_lock);
1102                 /* shouldn't happen, but might race */
1103                 if (obd->obd_stopping)
1104                         GOTO(exit_unlock, rc = -ENODEV);
1105
1106                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1107                 if (hash == NULL)
1108                         GOTO(exit_unlock, rc = -ENODEV);
1109                 spin_unlock(&obd->obd_dev_lock);
1110
1111                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1112                 if (rc != 0) {
1113                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1114                                       obd->obd_name, cluuid->uuid, rc);
1115                         GOTO(exit_err, rc = -EALREADY);
1116                 }
1117         }
1118
1119         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1120         spin_lock(&obd->obd_dev_lock);
1121         if (obd->obd_stopping) {
1122                 if (hash)
1123                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1124                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1125         }
1126
1127         if (!is_self) {
1128                 class_incref(obd, "export", export);
1129                 list_add_tail(&export->exp_obd_chain_timed,
1130                               &obd->obd_exports_timed);
1131                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1132                 obd->obd_num_exports++;
1133         } else {
1134                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1135                 INIT_LIST_HEAD(&export->exp_obd_chain);
1136         }
1137         spin_unlock(&obd->obd_dev_lock);
1138         if (hash)
1139                 cfs_hash_putref(hash);
1140         RETURN(export);
1141
1142 exit_unlock:
1143         spin_unlock(&obd->obd_dev_lock);
1144 exit_err:
1145         if (hash)
1146                 cfs_hash_putref(hash);
1147         class_handle_unhash(&export->exp_handle);
1148         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1149         obd_destroy_export(export);
1150         OBD_FREE_PTR(export);
1151         return ERR_PTR(rc);
1152 }
1153
1154 struct obd_export *class_new_export(struct obd_device *obd,
1155                                     struct obd_uuid *uuid)
1156 {
1157         return __class_new_export(obd, uuid, false);
1158 }
1159 EXPORT_SYMBOL(class_new_export);
1160
1161 struct obd_export *class_new_export_self(struct obd_device *obd,
1162                                          struct obd_uuid *uuid)
1163 {
1164         return __class_new_export(obd, uuid, true);
1165 }
1166
1167 void class_unlink_export(struct obd_export *exp)
1168 {
1169         class_handle_unhash(&exp->exp_handle);
1170
1171         if (exp->exp_obd->obd_self_export == exp) {
1172                 class_export_put(exp);
1173                 return;
1174         }
1175
1176         spin_lock(&exp->exp_obd->obd_dev_lock);
1177         /* delete an uuid-export hashitem from hashtables */
1178         if (!hlist_unhashed(&exp->exp_uuid_hash))
1179                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1180                              &exp->exp_client_uuid,
1181                              &exp->exp_uuid_hash);
1182
1183 #ifdef HAVE_SERVER_SUPPORT
1184         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1185                 struct tg_export_data   *ted = &exp->exp_target_data;
1186                 struct cfs_hash         *hash;
1187
1188                 /* Because obd_gen_hash will not be released until
1189                  * class_cleanup(), so hash should never be NULL here */
1190                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1191                 LASSERT(hash != NULL);
1192                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1193                              &exp->exp_gen_hash);
1194                 cfs_hash_putref(hash);
1195         }
1196 #endif /* HAVE_SERVER_SUPPORT */
1197
1198         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1199         list_del_init(&exp->exp_obd_chain_timed);
1200         exp->exp_obd->obd_num_exports--;
1201         spin_unlock(&exp->exp_obd->obd_dev_lock);
1202         atomic_inc(&obd_stale_export_num);
1203
1204         /* A reference is kept by obd_stale_exports list */
1205         obd_stale_export_put(exp);
1206 }
1207 EXPORT_SYMBOL(class_unlink_export);
1208
1209 /* Import management functions */
1210 static void obd_zombie_import_free(struct obd_import *imp)
1211 {
1212         ENTRY;
1213
1214         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1215                 imp->imp_obd->obd_name);
1216
1217         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1218
1219         ptlrpc_put_connection_superhack(imp->imp_connection);
1220
1221         while (!list_empty(&imp->imp_conn_list)) {
1222                 struct obd_import_conn *imp_conn;
1223
1224                 imp_conn = list_entry(imp->imp_conn_list.next,
1225                                       struct obd_import_conn, oic_item);
1226                 list_del_init(&imp_conn->oic_item);
1227                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1228                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1229         }
1230
1231         LASSERT(imp->imp_sec == NULL);
1232         class_decref(imp->imp_obd, "import", imp);
1233         OBD_FREE_PTR(imp);
1234         EXIT;
1235 }
1236
1237 struct obd_import *class_import_get(struct obd_import *import)
1238 {
1239         atomic_inc(&import->imp_refcount);
1240         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1241                atomic_read(&import->imp_refcount),
1242                import->imp_obd->obd_name);
1243         return import;
1244 }
1245 EXPORT_SYMBOL(class_import_get);
1246
1247 void class_import_put(struct obd_import *imp)
1248 {
1249         ENTRY;
1250
1251         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1252
1253         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1254                atomic_read(&imp->imp_refcount) - 1,
1255                imp->imp_obd->obd_name);
1256
1257         if (atomic_dec_and_test(&imp->imp_refcount)) {
1258                 CDEBUG(D_INFO, "final put import %p\n", imp);
1259                 obd_zombie_import_add(imp);
1260         }
1261
1262         /* catch possible import put race */
1263         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1264         EXIT;
1265 }
1266 EXPORT_SYMBOL(class_import_put);
1267
1268 static void init_imp_at(struct imp_at *at) {
1269         int i;
1270         at_init(&at->iat_net_latency, 0, 0);
1271         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1272                 /* max service estimates are tracked on the server side, so
1273                    don't use the AT history here, just use the last reported
1274                    val. (But keep hist for proc histogram, worst_ever) */
1275                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1276                         AT_FLG_NOHIST);
1277         }
1278 }
1279
1280 static void obd_zombie_imp_cull(struct work_struct *ws)
1281 {
1282         struct obd_import *import;
1283
1284         import = container_of(ws, struct obd_import, imp_zombie_work);
1285         obd_zombie_import_free(import);
1286 }
1287
1288 struct obd_import *class_new_import(struct obd_device *obd)
1289 {
1290         struct obd_import *imp;
1291         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1292
1293         OBD_ALLOC(imp, sizeof(*imp));
1294         if (imp == NULL)
1295                 return NULL;
1296
1297         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1298         INIT_LIST_HEAD(&imp->imp_replay_list);
1299         INIT_LIST_HEAD(&imp->imp_sending_list);
1300         INIT_LIST_HEAD(&imp->imp_delayed_list);
1301         INIT_LIST_HEAD(&imp->imp_committed_list);
1302         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1303         imp->imp_known_replied_xid = 0;
1304         imp->imp_replay_cursor = &imp->imp_committed_list;
1305         spin_lock_init(&imp->imp_lock);
1306         imp->imp_last_success_conn = 0;
1307         imp->imp_state = LUSTRE_IMP_NEW;
1308         imp->imp_obd = class_incref(obd, "import", imp);
1309         rwlock_init(&imp->imp_sec_lock);
1310         init_waitqueue_head(&imp->imp_recovery_waitq);
1311         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1312
1313         if (curr_pid_ns->child_reaper)
1314                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1315         else
1316                 imp->imp_sec_refpid = 1;
1317
1318         atomic_set(&imp->imp_refcount, 2);
1319         atomic_set(&imp->imp_unregistering, 0);
1320         atomic_set(&imp->imp_inflight, 0);
1321         atomic_set(&imp->imp_replay_inflight, 0);
1322         atomic_set(&imp->imp_inval_count, 0);
1323         INIT_LIST_HEAD(&imp->imp_conn_list);
1324         init_imp_at(&imp->imp_at);
1325
1326         /* the default magic is V2, will be used in connect RPC, and
1327          * then adjusted according to the flags in request/reply. */
1328         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1329
1330         return imp;
1331 }
1332 EXPORT_SYMBOL(class_new_import);
1333
1334 void class_destroy_import(struct obd_import *import)
1335 {
1336         LASSERT(import != NULL);
1337         LASSERT(import != LP_POISON);
1338
1339         spin_lock(&import->imp_lock);
1340         import->imp_generation++;
1341         spin_unlock(&import->imp_lock);
1342         class_import_put(import);
1343 }
1344 EXPORT_SYMBOL(class_destroy_import);
1345
1346 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1347
1348 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1349 {
1350         spin_lock(&exp->exp_locks_list_guard);
1351
1352         LASSERT(lock->l_exp_refs_nr >= 0);
1353
1354         if (lock->l_exp_refs_target != NULL &&
1355             lock->l_exp_refs_target != exp) {
1356                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1357                               exp, lock, lock->l_exp_refs_target);
1358         }
1359         if ((lock->l_exp_refs_nr ++) == 0) {
1360                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1361                 lock->l_exp_refs_target = exp;
1362         }
1363         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1364                lock, exp, lock->l_exp_refs_nr);
1365         spin_unlock(&exp->exp_locks_list_guard);
1366 }
1367 EXPORT_SYMBOL(__class_export_add_lock_ref);
1368
1369 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1370 {
1371         spin_lock(&exp->exp_locks_list_guard);
1372         LASSERT(lock->l_exp_refs_nr > 0);
1373         if (lock->l_exp_refs_target != exp) {
1374                 LCONSOLE_WARN("lock %p, "
1375                               "mismatching export pointers: %p, %p\n",
1376                               lock, lock->l_exp_refs_target, exp);
1377         }
1378         if (-- lock->l_exp_refs_nr == 0) {
1379                 list_del_init(&lock->l_exp_refs_link);
1380                 lock->l_exp_refs_target = NULL;
1381         }
1382         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1383                lock, exp, lock->l_exp_refs_nr);
1384         spin_unlock(&exp->exp_locks_list_guard);
1385 }
1386 EXPORT_SYMBOL(__class_export_del_lock_ref);
1387 #endif
1388
1389 /* A connection defines an export context in which preallocation can
1390    be managed. This releases the export pointer reference, and returns
1391    the export handle, so the export refcount is 1 when this function
1392    returns. */
1393 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1394                   struct obd_uuid *cluuid)
1395 {
1396         struct obd_export *export;
1397         LASSERT(conn != NULL);
1398         LASSERT(obd != NULL);
1399         LASSERT(cluuid != NULL);
1400         ENTRY;
1401
1402         export = class_new_export(obd, cluuid);
1403         if (IS_ERR(export))
1404                 RETURN(PTR_ERR(export));
1405
1406         conn->cookie = export->exp_handle.h_cookie;
1407         class_export_put(export);
1408
1409         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1410                cluuid->uuid, conn->cookie);
1411         RETURN(0);
1412 }
1413 EXPORT_SYMBOL(class_connect);
1414
1415 /* if export is involved in recovery then clean up related things */
1416 static void class_export_recovery_cleanup(struct obd_export *exp)
1417 {
1418         struct obd_device *obd = exp->exp_obd;
1419
1420         spin_lock(&obd->obd_recovery_task_lock);
1421         if (obd->obd_recovering) {
1422                 if (exp->exp_in_recovery) {
1423                         spin_lock(&exp->exp_lock);
1424                         exp->exp_in_recovery = 0;
1425                         spin_unlock(&exp->exp_lock);
1426                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1427                         atomic_dec(&obd->obd_connected_clients);
1428                 }
1429
1430                 /* if called during recovery then should update
1431                  * obd_stale_clients counter,
1432                  * lightweight exports are not counted */
1433                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1434                         exp->exp_obd->obd_stale_clients++;
1435         }
1436         spin_unlock(&obd->obd_recovery_task_lock);
1437
1438         spin_lock(&exp->exp_lock);
1439         /** Cleanup req replay fields */
1440         if (exp->exp_req_replay_needed) {
1441                 exp->exp_req_replay_needed = 0;
1442
1443                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1444                 atomic_dec(&obd->obd_req_replay_clients);
1445         }
1446
1447         /** Cleanup lock replay data */
1448         if (exp->exp_lock_replay_needed) {
1449                 exp->exp_lock_replay_needed = 0;
1450
1451                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1452                 atomic_dec(&obd->obd_lock_replay_clients);
1453         }
1454         spin_unlock(&exp->exp_lock);
1455 }
1456
1457 /* This function removes 1-3 references from the export:
1458  * 1 - for export pointer passed
1459  * and if disconnect really need
1460  * 2 - removing from hash
1461  * 3 - in client_unlink_export
1462  * The export pointer passed to this function can destroyed */
1463 int class_disconnect(struct obd_export *export)
1464 {
1465         int already_disconnected;
1466         ENTRY;
1467
1468         if (export == NULL) {
1469                 CWARN("attempting to free NULL export %p\n", export);
1470                 RETURN(-EINVAL);
1471         }
1472
1473         spin_lock(&export->exp_lock);
1474         already_disconnected = export->exp_disconnected;
1475         export->exp_disconnected = 1;
1476         /*  We hold references of export for uuid hash
1477          *  and nid_hash and export link at least. So
1478          *  it is safe to call cfs_hash_del in there.  */
1479         if (!hlist_unhashed(&export->exp_nid_hash))
1480                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1481                              &export->exp_connection->c_peer.nid,
1482                              &export->exp_nid_hash);
1483         spin_unlock(&export->exp_lock);
1484
1485         /* class_cleanup(), abort_recovery(), and class_fail_export()
1486          * all end up in here, and if any of them race we shouldn't
1487          * call extra class_export_puts(). */
1488         if (already_disconnected) {
1489                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1490                 GOTO(no_disconn, already_disconnected);
1491         }
1492
1493         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1494                export->exp_handle.h_cookie);
1495
1496         class_export_recovery_cleanup(export);
1497         class_unlink_export(export);
1498 no_disconn:
1499         class_export_put(export);
1500         RETURN(0);
1501 }
1502 EXPORT_SYMBOL(class_disconnect);
1503
1504 /* Return non-zero for a fully connected export */
1505 int class_connected_export(struct obd_export *exp)
1506 {
1507         int connected = 0;
1508
1509         if (exp) {
1510                 spin_lock(&exp->exp_lock);
1511                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1512                 spin_unlock(&exp->exp_lock);
1513         }
1514         return connected;
1515 }
1516 EXPORT_SYMBOL(class_connected_export);
1517
1518 static void class_disconnect_export_list(struct list_head *list,
1519                                          enum obd_option flags)
1520 {
1521         int rc;
1522         struct obd_export *exp;
1523         ENTRY;
1524
1525         /* It's possible that an export may disconnect itself, but
1526          * nothing else will be added to this list. */
1527         while (!list_empty(list)) {
1528                 exp = list_entry(list->next, struct obd_export,
1529                                  exp_obd_chain);
1530                 /* need for safe call CDEBUG after obd_disconnect */
1531                 class_export_get(exp);
1532
1533                 spin_lock(&exp->exp_lock);
1534                 exp->exp_flags = flags;
1535                 spin_unlock(&exp->exp_lock);
1536
1537                 if (obd_uuid_equals(&exp->exp_client_uuid,
1538                                     &exp->exp_obd->obd_uuid)) {
1539                         CDEBUG(D_HA,
1540                                "exp %p export uuid == obd uuid, don't discon\n",
1541                                exp);
1542                         /* Need to delete this now so we don't end up pointing
1543                          * to work_list later when this export is cleaned up. */
1544                         list_del_init(&exp->exp_obd_chain);
1545                         class_export_put(exp);
1546                         continue;
1547                 }
1548
1549                 class_export_get(exp);
1550                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1551                        "last request at %lld\n",
1552                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1553                        exp, exp->exp_last_request_time);
1554                 /* release one export reference anyway */
1555                 rc = obd_disconnect(exp);
1556
1557                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1558                        obd_export_nid2str(exp), exp, rc);
1559                 class_export_put(exp);
1560         }
1561         EXIT;
1562 }
1563
1564 void class_disconnect_exports(struct obd_device *obd)
1565 {
1566         struct list_head work_list;
1567         ENTRY;
1568
1569         /* Move all of the exports from obd_exports to a work list, en masse. */
1570         INIT_LIST_HEAD(&work_list);
1571         spin_lock(&obd->obd_dev_lock);
1572         list_splice_init(&obd->obd_exports, &work_list);
1573         list_splice_init(&obd->obd_delayed_exports, &work_list);
1574         spin_unlock(&obd->obd_dev_lock);
1575
1576         if (!list_empty(&work_list)) {
1577                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1578                        "disconnecting them\n", obd->obd_minor, obd);
1579                 class_disconnect_export_list(&work_list,
1580                                              exp_flags_from_obd(obd));
1581         } else
1582                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1583                        obd->obd_minor, obd);
1584         EXIT;
1585 }
1586 EXPORT_SYMBOL(class_disconnect_exports);
1587
1588 /* Remove exports that have not completed recovery.
1589  */
1590 void class_disconnect_stale_exports(struct obd_device *obd,
1591                                     int (*test_export)(struct obd_export *))
1592 {
1593         struct list_head work_list;
1594         struct obd_export *exp, *n;
1595         int evicted = 0;
1596         ENTRY;
1597
1598         INIT_LIST_HEAD(&work_list);
1599         spin_lock(&obd->obd_dev_lock);
1600         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1601                                  exp_obd_chain) {
1602                 /* don't count self-export as client */
1603                 if (obd_uuid_equals(&exp->exp_client_uuid,
1604                                     &exp->exp_obd->obd_uuid))
1605                         continue;
1606
1607                 /* don't evict clients which have no slot in last_rcvd
1608                  * (e.g. lightweight connection) */
1609                 if (exp->exp_target_data.ted_lr_idx == -1)
1610                         continue;
1611
1612                 spin_lock(&exp->exp_lock);
1613                 if (exp->exp_failed || test_export(exp)) {
1614                         spin_unlock(&exp->exp_lock);
1615                         continue;
1616                 }
1617                 exp->exp_failed = 1;
1618                 spin_unlock(&exp->exp_lock);
1619
1620                 list_move(&exp->exp_obd_chain, &work_list);
1621                 evicted++;
1622                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1623                        obd->obd_name, exp->exp_client_uuid.uuid,
1624                        obd_export_nid2str(exp));
1625                 print_export_data(exp, "EVICTING", 0, D_HA);
1626         }
1627         spin_unlock(&obd->obd_dev_lock);
1628
1629         if (evicted)
1630                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1631                               obd->obd_name, evicted);
1632
1633         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1634                                                  OBD_OPT_ABORT_RECOV);
1635         EXIT;
1636 }
1637 EXPORT_SYMBOL(class_disconnect_stale_exports);
1638
1639 void class_fail_export(struct obd_export *exp)
1640 {
1641         int rc, already_failed;
1642
1643         spin_lock(&exp->exp_lock);
1644         already_failed = exp->exp_failed;
1645         exp->exp_failed = 1;
1646         spin_unlock(&exp->exp_lock);
1647
1648         if (already_failed) {
1649                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1650                        exp, exp->exp_client_uuid.uuid);
1651                 return;
1652         }
1653
1654         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1655                exp, exp->exp_client_uuid.uuid);
1656
1657         if (obd_dump_on_timeout)
1658                 libcfs_debug_dumplog();
1659
1660         /* need for safe call CDEBUG after obd_disconnect */
1661         class_export_get(exp);
1662
1663         /* Most callers into obd_disconnect are removing their own reference
1664          * (request, for example) in addition to the one from the hash table.
1665          * We don't have such a reference here, so make one. */
1666         class_export_get(exp);
1667         rc = obd_disconnect(exp);
1668         if (rc)
1669                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1670         else
1671                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1672                        exp, exp->exp_client_uuid.uuid);
1673         class_export_put(exp);
1674 }
1675 EXPORT_SYMBOL(class_fail_export);
1676
1677 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1678 {
1679         struct cfs_hash *nid_hash;
1680         struct obd_export *doomed_exp = NULL;
1681         int exports_evicted = 0;
1682
1683         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1684
1685         spin_lock(&obd->obd_dev_lock);
1686         /* umount has run already, so evict thread should leave
1687          * its task to umount thread now */
1688         if (obd->obd_stopping) {
1689                 spin_unlock(&obd->obd_dev_lock);
1690                 return exports_evicted;
1691         }
1692         nid_hash = obd->obd_nid_hash;
1693         cfs_hash_getref(nid_hash);
1694         spin_unlock(&obd->obd_dev_lock);
1695
1696         do {
1697                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1698                 if (doomed_exp == NULL)
1699                         break;
1700
1701                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1702                          "nid %s found, wanted nid %s, requested nid %s\n",
1703                          obd_export_nid2str(doomed_exp),
1704                          libcfs_nid2str(nid_key), nid);
1705                 LASSERTF(doomed_exp != obd->obd_self_export,
1706                          "self-export is hashed by NID?\n");
1707                 exports_evicted++;
1708                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1709                               "request\n", obd->obd_name,
1710                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1711                               obd_export_nid2str(doomed_exp));
1712                 class_fail_export(doomed_exp);
1713                 class_export_put(doomed_exp);
1714         } while (1);
1715
1716         cfs_hash_putref(nid_hash);
1717
1718         if (!exports_evicted)
1719                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1720                        obd->obd_name, nid);
1721         return exports_evicted;
1722 }
1723 EXPORT_SYMBOL(obd_export_evict_by_nid);
1724
1725 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1726 {
1727         struct cfs_hash *uuid_hash;
1728         struct obd_export *doomed_exp = NULL;
1729         struct obd_uuid doomed_uuid;
1730         int exports_evicted = 0;
1731
1732         spin_lock(&obd->obd_dev_lock);
1733         if (obd->obd_stopping) {
1734                 spin_unlock(&obd->obd_dev_lock);
1735                 return exports_evicted;
1736         }
1737         uuid_hash = obd->obd_uuid_hash;
1738         cfs_hash_getref(uuid_hash);
1739         spin_unlock(&obd->obd_dev_lock);
1740
1741         obd_str2uuid(&doomed_uuid, uuid);
1742         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1743                 CERROR("%s: can't evict myself\n", obd->obd_name);
1744                 cfs_hash_putref(uuid_hash);
1745                 return exports_evicted;
1746         }
1747
1748         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1749
1750         if (doomed_exp == NULL) {
1751                 CERROR("%s: can't disconnect %s: no exports found\n",
1752                        obd->obd_name, uuid);
1753         } else {
1754                 CWARN("%s: evicting %s at adminstrative request\n",
1755                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1756                 class_fail_export(doomed_exp);
1757                 class_export_put(doomed_exp);
1758                 exports_evicted++;
1759         }
1760         cfs_hash_putref(uuid_hash);
1761
1762         return exports_evicted;
1763 }
1764
1765 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1766 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1767 EXPORT_SYMBOL(class_export_dump_hook);
1768 #endif
1769
1770 static void print_export_data(struct obd_export *exp, const char *status,
1771                               int locks, int debug_level)
1772 {
1773         struct ptlrpc_reply_state *rs;
1774         struct ptlrpc_reply_state *first_reply = NULL;
1775         int nreplies = 0;
1776
1777         spin_lock(&exp->exp_lock);
1778         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1779                             rs_exp_list) {
1780                 if (nreplies == 0)
1781                         first_reply = rs;
1782                 nreplies++;
1783         }
1784         spin_unlock(&exp->exp_lock);
1785
1786         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1787                "%p %s %llu stale:%d\n",
1788                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1789                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1790                atomic_read(&exp->exp_rpc_count),
1791                atomic_read(&exp->exp_cb_count),
1792                atomic_read(&exp->exp_locks_count),
1793                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1794                nreplies, first_reply, nreplies > 3 ? "..." : "",
1795                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1796 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1797         if (locks && class_export_dump_hook != NULL)
1798                 class_export_dump_hook(exp);
1799 #endif
1800 }
1801
1802 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1803 {
1804         struct obd_export *exp;
1805
1806         spin_lock(&obd->obd_dev_lock);
1807         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1808                 print_export_data(exp, "ACTIVE", locks, debug_level);
1809         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1810                 print_export_data(exp, "UNLINKED", locks, debug_level);
1811         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1812                 print_export_data(exp, "DELAYED", locks, debug_level);
1813         spin_unlock(&obd->obd_dev_lock);
1814 }
1815
1816 void obd_exports_barrier(struct obd_device *obd)
1817 {
1818         int waited = 2;
1819         LASSERT(list_empty(&obd->obd_exports));
1820         spin_lock(&obd->obd_dev_lock);
1821         while (!list_empty(&obd->obd_unlinked_exports)) {
1822                 spin_unlock(&obd->obd_dev_lock);
1823                 set_current_state(TASK_UNINTERRUPTIBLE);
1824                 schedule_timeout(cfs_time_seconds(waited));
1825                 if (waited > 5 && is_power_of_2(waited)) {
1826                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1827                                       "more than %d seconds. "
1828                                       "The obd refcount = %d. Is it stuck?\n",
1829                                       obd->obd_name, waited,
1830                                       atomic_read(&obd->obd_refcount));
1831                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1832                 }
1833                 waited *= 2;
1834                 spin_lock(&obd->obd_dev_lock);
1835         }
1836         spin_unlock(&obd->obd_dev_lock);
1837 }
1838 EXPORT_SYMBOL(obd_exports_barrier);
1839
1840 /**
1841  * Add export to the obd_zombe thread and notify it.
1842  */
1843 static void obd_zombie_export_add(struct obd_export *exp) {
1844         atomic_dec(&obd_stale_export_num);
1845         spin_lock(&exp->exp_obd->obd_dev_lock);
1846         LASSERT(!list_empty(&exp->exp_obd_chain));
1847         list_del_init(&exp->exp_obd_chain);
1848         spin_unlock(&exp->exp_obd->obd_dev_lock);
1849
1850         queue_work(zombie_wq, &exp->exp_zombie_work);
1851 }
1852
1853 /**
1854  * Add import to the obd_zombe thread and notify it.
1855  */
1856 static void obd_zombie_import_add(struct obd_import *imp) {
1857         LASSERT(imp->imp_sec == NULL);
1858
1859         queue_work(zombie_wq, &imp->imp_zombie_work);
1860 }
1861
1862 /**
1863  * wait when obd_zombie import/export queues become empty
1864  */
1865 void obd_zombie_barrier(void)
1866 {
1867         flush_workqueue(zombie_wq);
1868 }
1869 EXPORT_SYMBOL(obd_zombie_barrier);
1870
1871
1872 struct obd_export *obd_stale_export_get(void)
1873 {
1874         struct obd_export *exp = NULL;
1875         ENTRY;
1876
1877         spin_lock(&obd_stale_export_lock);
1878         if (!list_empty(&obd_stale_exports)) {
1879                 exp = list_entry(obd_stale_exports.next,
1880                                  struct obd_export, exp_stale_list);
1881                 list_del_init(&exp->exp_stale_list);
1882         }
1883         spin_unlock(&obd_stale_export_lock);
1884
1885         if (exp) {
1886                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1887                        atomic_read(&obd_stale_export_num));
1888         }
1889         RETURN(exp);
1890 }
1891 EXPORT_SYMBOL(obd_stale_export_get);
1892
1893 void obd_stale_export_put(struct obd_export *exp)
1894 {
1895         ENTRY;
1896
1897         LASSERT(list_empty(&exp->exp_stale_list));
1898         if (exp->exp_lock_hash &&
1899             atomic_read(&exp->exp_lock_hash->hs_count)) {
1900                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1901                        atomic_read(&obd_stale_export_num));
1902
1903                 spin_lock_bh(&exp->exp_bl_list_lock);
1904                 spin_lock(&obd_stale_export_lock);
1905                 /* Add to the tail if there is no blocked locks,
1906                  * to the head otherwise. */
1907                 if (list_empty(&exp->exp_bl_list))
1908                         list_add_tail(&exp->exp_stale_list,
1909                                       &obd_stale_exports);
1910                 else
1911                         list_add(&exp->exp_stale_list,
1912                                  &obd_stale_exports);
1913
1914                 spin_unlock(&obd_stale_export_lock);
1915                 spin_unlock_bh(&exp->exp_bl_list_lock);
1916         } else {
1917                 class_export_put(exp);
1918         }
1919         EXIT;
1920 }
1921 EXPORT_SYMBOL(obd_stale_export_put);
1922
1923 /**
1924  * Adjust the position of the export in the stale list,
1925  * i.e. move to the head of the list if is needed.
1926  **/
1927 void obd_stale_export_adjust(struct obd_export *exp)
1928 {
1929         LASSERT(exp != NULL);
1930         spin_lock_bh(&exp->exp_bl_list_lock);
1931         spin_lock(&obd_stale_export_lock);
1932
1933         if (!list_empty(&exp->exp_stale_list) &&
1934             !list_empty(&exp->exp_bl_list))
1935                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1936
1937         spin_unlock(&obd_stale_export_lock);
1938         spin_unlock_bh(&exp->exp_bl_list_lock);
1939 }
1940 EXPORT_SYMBOL(obd_stale_export_adjust);
1941
1942 /**
1943  * start destroy zombie import/export thread
1944  */
1945 int obd_zombie_impexp_init(void)
1946 {
1947         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1948         if (!zombie_wq)
1949                 return -ENOMEM;
1950
1951         return 0;
1952 }
1953
1954 /**
1955  * stop destroy zombie import/export thread
1956  */
1957 void obd_zombie_impexp_stop(void)
1958 {
1959         destroy_workqueue(zombie_wq);
1960         LASSERT(list_empty(&obd_stale_exports));
1961 }
1962
1963 /***** Kernel-userspace comm helpers *******/
1964
1965 /* Get length of entire message, including header */
1966 int kuc_len(int payload_len)
1967 {
1968         return sizeof(struct kuc_hdr) + payload_len;
1969 }
1970 EXPORT_SYMBOL(kuc_len);
1971
1972 /* Get a pointer to kuc header, given a ptr to the payload
1973  * @param p Pointer to payload area
1974  * @returns Pointer to kuc header
1975  */
1976 struct kuc_hdr * kuc_ptr(void *p)
1977 {
1978         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1979         LASSERT(lh->kuc_magic == KUC_MAGIC);
1980         return lh;
1981 }
1982 EXPORT_SYMBOL(kuc_ptr);
1983
1984 /* Alloc space for a message, and fill in header
1985  * @return Pointer to payload area
1986  */
1987 void *kuc_alloc(int payload_len, int transport, int type)
1988 {
1989         struct kuc_hdr *lh;
1990         int len = kuc_len(payload_len);
1991
1992         OBD_ALLOC(lh, len);
1993         if (lh == NULL)
1994                 return ERR_PTR(-ENOMEM);
1995
1996         lh->kuc_magic = KUC_MAGIC;
1997         lh->kuc_transport = transport;
1998         lh->kuc_msgtype = type;
1999         lh->kuc_msglen = len;
2000
2001         return (void *)(lh + 1);
2002 }
2003 EXPORT_SYMBOL(kuc_alloc);
2004
2005 /* Takes pointer to payload area */
2006 void kuc_free(void *p, int payload_len)
2007 {
2008         struct kuc_hdr *lh = kuc_ptr(p);
2009         OBD_FREE(lh, kuc_len(payload_len));
2010 }
2011 EXPORT_SYMBOL(kuc_free);
2012
2013 struct obd_request_slot_waiter {
2014         struct list_head        orsw_entry;
2015         wait_queue_head_t       orsw_waitq;
2016         bool                    orsw_signaled;
2017 };
2018
2019 static bool obd_request_slot_avail(struct client_obd *cli,
2020                                    struct obd_request_slot_waiter *orsw)
2021 {
2022         bool avail;
2023
2024         spin_lock(&cli->cl_loi_list_lock);
2025         avail = !!list_empty(&orsw->orsw_entry);
2026         spin_unlock(&cli->cl_loi_list_lock);
2027
2028         return avail;
2029 };
2030
2031 /*
2032  * For network flow control, the RPC sponsor needs to acquire a credit
2033  * before sending the RPC. The credits count for a connection is defined
2034  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2035  * the subsequent RPC sponsors need to wait until others released their
2036  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2037  */
2038 int obd_get_request_slot(struct client_obd *cli)
2039 {
2040         struct obd_request_slot_waiter   orsw;
2041         struct l_wait_info               lwi;
2042         int                              rc;
2043
2044         spin_lock(&cli->cl_loi_list_lock);
2045         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2046                 cli->cl_rpcs_in_flight++;
2047                 spin_unlock(&cli->cl_loi_list_lock);
2048                 return 0;
2049         }
2050
2051         init_waitqueue_head(&orsw.orsw_waitq);
2052         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2053         orsw.orsw_signaled = false;
2054         spin_unlock(&cli->cl_loi_list_lock);
2055
2056         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2057         rc = l_wait_event(orsw.orsw_waitq,
2058                           obd_request_slot_avail(cli, &orsw) ||
2059                           orsw.orsw_signaled,
2060                           &lwi);
2061
2062         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2063          * freed but other (such as obd_put_request_slot) is using it. */
2064         spin_lock(&cli->cl_loi_list_lock);
2065         if (rc != 0) {
2066                 if (!orsw.orsw_signaled) {
2067                         if (list_empty(&orsw.orsw_entry))
2068                                 cli->cl_rpcs_in_flight--;
2069                         else
2070                                 list_del(&orsw.orsw_entry);
2071                 }
2072         }
2073
2074         if (orsw.orsw_signaled) {
2075                 LASSERT(list_empty(&orsw.orsw_entry));
2076
2077                 rc = -EINTR;
2078         }
2079         spin_unlock(&cli->cl_loi_list_lock);
2080
2081         return rc;
2082 }
2083 EXPORT_SYMBOL(obd_get_request_slot);
2084
2085 void obd_put_request_slot(struct client_obd *cli)
2086 {
2087         struct obd_request_slot_waiter *orsw;
2088
2089         spin_lock(&cli->cl_loi_list_lock);
2090         cli->cl_rpcs_in_flight--;
2091
2092         /* If there is free slot, wakeup the first waiter. */
2093         if (!list_empty(&cli->cl_flight_waiters) &&
2094             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2095                 orsw = list_entry(cli->cl_flight_waiters.next,
2096                                   struct obd_request_slot_waiter, orsw_entry);
2097                 list_del_init(&orsw->orsw_entry);
2098                 cli->cl_rpcs_in_flight++;
2099                 wake_up(&orsw->orsw_waitq);
2100         }
2101         spin_unlock(&cli->cl_loi_list_lock);
2102 }
2103 EXPORT_SYMBOL(obd_put_request_slot);
2104
2105 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2106 {
2107         return cli->cl_max_rpcs_in_flight;
2108 }
2109 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2110
2111 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2112 {
2113         struct obd_request_slot_waiter *orsw;
2114         __u32                           old;
2115         int                             diff;
2116         int                             i;
2117         const char *type_name;
2118         int                             rc;
2119
2120         if (max > OBD_MAX_RIF_MAX || max < 1)
2121                 return -ERANGE;
2122
2123         type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2124         if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2125                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2126                  * strictly lower that max_rpcs_in_flight */
2127                 if (max < 2) {
2128                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2129                                "because it must be higher than "
2130                                "max_mod_rpcs_in_flight value",
2131                                cli->cl_import->imp_obd->obd_name);
2132                         return -ERANGE;
2133                 }
2134                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2135                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2136                         if (rc != 0)
2137                                 return rc;
2138                 }
2139         }
2140
2141         spin_lock(&cli->cl_loi_list_lock);
2142         old = cli->cl_max_rpcs_in_flight;
2143         cli->cl_max_rpcs_in_flight = max;
2144         client_adjust_max_dirty(cli);
2145
2146         diff = max - old;
2147
2148         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2149         for (i = 0; i < diff; i++) {
2150                 if (list_empty(&cli->cl_flight_waiters))
2151                         break;
2152
2153                 orsw = list_entry(cli->cl_flight_waiters.next,
2154                                   struct obd_request_slot_waiter, orsw_entry);
2155                 list_del_init(&orsw->orsw_entry);
2156                 cli->cl_rpcs_in_flight++;
2157                 wake_up(&orsw->orsw_waitq);
2158         }
2159         spin_unlock(&cli->cl_loi_list_lock);
2160
2161         return 0;
2162 }
2163 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2164
2165 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2166 {
2167         return cli->cl_max_mod_rpcs_in_flight;
2168 }
2169 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2170
2171 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2172 {
2173         struct obd_connect_data *ocd;
2174         __u16 maxmodrpcs;
2175         __u16 prev;
2176
2177         if (max > OBD_MAX_RIF_MAX || max < 1)
2178                 return -ERANGE;
2179
2180         /* cannot exceed or equal max_rpcs_in_flight */
2181         if (max >= cli->cl_max_rpcs_in_flight) {
2182                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2183                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2184                        cli->cl_import->imp_obd->obd_name,
2185                        max, cli->cl_max_rpcs_in_flight);
2186                 return -ERANGE;
2187         }
2188
2189         /* cannot exceed max modify RPCs in flight supported by the server */
2190         ocd = &cli->cl_import->imp_connect_data;
2191         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2192                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2193         else
2194                 maxmodrpcs = 1;
2195         if (max > maxmodrpcs) {
2196                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2197                        "higher than max_mod_rpcs_per_client value (%hu) "
2198                        "returned by the server at connection\n",
2199                        cli->cl_import->imp_obd->obd_name,
2200                        max, maxmodrpcs);
2201                 return -ERANGE;
2202         }
2203
2204         spin_lock(&cli->cl_mod_rpcs_lock);
2205
2206         prev = cli->cl_max_mod_rpcs_in_flight;
2207         cli->cl_max_mod_rpcs_in_flight = max;
2208
2209         /* wakeup waiters if limit has been increased */
2210         if (cli->cl_max_mod_rpcs_in_flight > prev)
2211                 wake_up(&cli->cl_mod_rpcs_waitq);
2212
2213         spin_unlock(&cli->cl_mod_rpcs_lock);
2214
2215         return 0;
2216 }
2217 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2218
2219 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2220                                struct seq_file *seq)
2221 {
2222         unsigned long mod_tot = 0, mod_cum;
2223         struct timespec64 now;
2224         int i;
2225
2226         ktime_get_real_ts64(&now);
2227
2228         spin_lock(&cli->cl_mod_rpcs_lock);
2229
2230         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2231                    (s64)now.tv_sec, now.tv_nsec);
2232         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2233                    cli->cl_mod_rpcs_in_flight);
2234
2235         seq_printf(seq, "\n\t\t\tmodify\n");
2236         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2237
2238         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2239
2240         mod_cum = 0;
2241         for (i = 0; i < OBD_HIST_MAX; i++) {
2242                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2243                 mod_cum += mod;
2244                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2245                            i, mod, pct(mod, mod_tot),
2246                            pct(mod_cum, mod_tot));
2247                 if (mod_cum == mod_tot)
2248                         break;
2249         }
2250
2251         spin_unlock(&cli->cl_mod_rpcs_lock);
2252
2253         return 0;
2254 }
2255 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2256
2257 /* The number of modify RPCs sent in parallel is limited
2258  * because the server has a finite number of slots per client to
2259  * store request result and ensure reply reconstruction when needed.
2260  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2261  * that takes into account server limit and cl_max_rpcs_in_flight
2262  * value.
2263  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2264  * one close request is allowed above the maximum.
2265  */
2266 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2267                                                  bool close_req)
2268 {
2269         bool avail;
2270
2271         /* A slot is available if
2272          * - number of modify RPCs in flight is less than the max
2273          * - it's a close RPC and no other close request is in flight
2274          */
2275         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2276                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2277
2278         return avail;
2279 }
2280
2281 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2282                                          bool close_req)
2283 {
2284         bool avail;
2285
2286         spin_lock(&cli->cl_mod_rpcs_lock);
2287         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2288         spin_unlock(&cli->cl_mod_rpcs_lock);
2289         return avail;
2290 }
2291
2292 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2293 {
2294         if (it != NULL &&
2295             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2296              it->it_op == IT_READDIR ||
2297              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2298                         return true;
2299         return false;
2300 }
2301
2302 /* Get a modify RPC slot from the obd client @cli according
2303  * to the kind of operation @opc that is going to be sent
2304  * and the intent @it of the operation if it applies.
2305  * If the maximum number of modify RPCs in flight is reached
2306  * the thread is put to sleep.
2307  * Returns the tag to be set in the request message. Tag 0
2308  * is reserved for non-modifying requests.
2309  */
2310 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2311                            struct lookup_intent *it)
2312 {
2313         bool                    close_req = false;
2314         __u16                   i, max;
2315
2316         /* read-only metadata RPCs don't consume a slot on MDT
2317          * for reply reconstruction
2318          */
2319         if (obd_skip_mod_rpc_slot(it))
2320                 return 0;
2321
2322         if (opc == MDS_CLOSE)
2323                 close_req = true;
2324
2325         do {
2326                 spin_lock(&cli->cl_mod_rpcs_lock);
2327                 max = cli->cl_max_mod_rpcs_in_flight;
2328                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2329                         /* there is a slot available */
2330                         cli->cl_mod_rpcs_in_flight++;
2331                         if (close_req)
2332                                 cli->cl_close_rpcs_in_flight++;
2333                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2334                                          cli->cl_mod_rpcs_in_flight);
2335                         /* find a free tag */
2336                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2337                                                 max + 1);
2338                         LASSERT(i < OBD_MAX_RIF_MAX);
2339                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2340                         spin_unlock(&cli->cl_mod_rpcs_lock);
2341                         /* tag 0 is reserved for non-modify RPCs */
2342
2343                         CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
2344                                "opc %u, max %hu\n",
2345                                cli->cl_import->imp_obd->obd_name,
2346                                i + 1, opc, max);
2347
2348                         return i + 1;
2349                 }
2350                 spin_unlock(&cli->cl_mod_rpcs_lock);
2351
2352                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2353                        "opc %u, max %hu\n",
2354                        cli->cl_import->imp_obd->obd_name, opc, max);
2355
2356                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2357                                           obd_mod_rpc_slot_avail(cli,
2358                                                                  close_req));
2359         } while (true);
2360 }
2361 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2362
2363 /* Put a modify RPC slot from the obd client @cli according
2364  * to the kind of operation @opc that has been sent and the
2365  * intent @it of the operation if it applies.
2366  */
2367 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2368                           struct lookup_intent *it, __u16 tag)
2369 {
2370         bool                    close_req = false;
2371
2372         if (obd_skip_mod_rpc_slot(it))
2373                 return;
2374
2375         if (opc == MDS_CLOSE)
2376                 close_req = true;
2377
2378         spin_lock(&cli->cl_mod_rpcs_lock);
2379         cli->cl_mod_rpcs_in_flight--;
2380         if (close_req)
2381                 cli->cl_close_rpcs_in_flight--;
2382         /* release the tag in the bitmap */
2383         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2384         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2385         spin_unlock(&cli->cl_mod_rpcs_lock);
2386         wake_up(&cli->cl_mod_rpcs_waitq);
2387 }
2388 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2389