Whamcloud - gitweb
LU-12542 handle: move refcount into the lustre_handle.
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
48
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
55
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59                               const char *status, int locks, int debug_level);
60
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
64
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67
68 /*
69  * support functions: we could use inter-module communication, but this
70  * is more portable to other OS's
71  */
72 static struct obd_device *obd_device_alloc(void)
73 {
74         struct obd_device *obd;
75
76         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
77         if (obd != NULL) {
78                 obd->obd_magic = OBD_DEVICE_MAGIC;
79         }
80         return obd;
81 }
82
83 static void obd_device_free(struct obd_device *obd)
84 {
85         LASSERT(obd != NULL);
86         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88         if (obd->obd_namespace != NULL) {
89                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90                        obd, obd->obd_namespace, obd->obd_force);
91                 LBUG();
92         }
93         lu_ref_fini(&obd->obd_reference);
94         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 }
96
97 struct obd_type *class_search_type(const char *name)
98 {
99         struct kobject *kobj = kset_find_obj(lustre_kset, name);
100
101         if (kobj && kobj->ktype == &class_ktype)
102                 return container_of(kobj, struct obd_type, typ_kobj);
103
104         kobject_put(kobj);
105         return NULL;
106 }
107 EXPORT_SYMBOL(class_search_type);
108
109 struct obd_type *class_get_type(const char *name)
110 {
111         struct obd_type *type;
112
113         type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
115         if (!type) {
116                 const char *modname = name;
117
118 #ifdef HAVE_SERVER_SUPPORT
119                 if (strcmp(modname, "obdfilter") == 0)
120                         modname = "ofd";
121
122                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123                         modname = LUSTRE_OSP_NAME;
124
125                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126                         modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
128
129                 if (!request_module("%s", modname)) {
130                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131                         type = class_search_type(name);
132                 } else {
133                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134                                            modname);
135                 }
136         }
137 #endif
138         if (type) {
139                 spin_lock(&type->obd_type_lock);
140                 type->typ_refcnt++;
141                 try_module_get(type->typ_dt_ops->o_owner);
142                 spin_unlock(&type->obd_type_lock);
143                 /* class_search_type() returned a counted reference,
144                  * but we don't need that count any more as
145                  * we have one through typ_refcnt.
146                  */
147                 kobject_put(&type->typ_kobj);
148         }
149         return type;
150 }
151
152 void class_put_type(struct obd_type *type)
153 {
154         LASSERT(type);
155         spin_lock(&type->obd_type_lock);
156         type->typ_refcnt--;
157         module_put(type->typ_dt_ops->o_owner);
158         spin_unlock(&type->obd_type_lock);
159 }
160
161 static void class_sysfs_release(struct kobject *kobj)
162 {
163         struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
164
165         debugfs_remove_recursive(type->typ_debugfs_entry);
166         type->typ_debugfs_entry = NULL;
167
168         if (type->typ_lu)
169                 lu_device_type_fini(type->typ_lu);
170
171 #ifdef CONFIG_PROC_FS
172         if (type->typ_name && type->typ_procroot)
173                 remove_proc_subtree(type->typ_name, proc_lustre_root);
174 #endif
175         if (type->typ_md_ops)
176                 OBD_FREE_PTR(type->typ_md_ops);
177         if (type->typ_dt_ops)
178                 OBD_FREE_PTR(type->typ_dt_ops);
179
180         OBD_FREE(type, sizeof(*type));
181 }
182
183 static struct kobj_type class_ktype = {
184         .sysfs_ops      = &lustre_sysfs_ops,
185         .release        = class_sysfs_release,
186 };
187
188 #ifdef HAVE_SERVER_SUPPORT
189 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
190 {
191         struct dentry *symlink;
192         struct obd_type *type;
193         int rc;
194
195         type = class_search_type(name);
196         if (type) {
197                 kobject_put(&type->typ_kobj);
198                 return ERR_PTR(-EEXIST);
199         }
200
201         OBD_ALLOC(type, sizeof(*type));
202         if (!type)
203                 return ERR_PTR(-ENOMEM);
204
205         type->typ_kobj.kset = lustre_kset;
206         rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
207                                   &lustre_kset->kobj, "%s", name);
208         if (rc)
209                 return ERR_PTR(rc);
210
211         symlink = debugfs_create_dir(name, debugfs_lustre_root);
212         if (IS_ERR_OR_NULL(symlink)) {
213                 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
214                 kobject_put(&type->typ_kobj);
215                 return ERR_PTR(rc);
216         }
217         type->typ_debugfs_entry = symlink;
218         type->typ_sym_filter = true;
219
220         if (enable_proc) {
221                 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
222                                                       NULL, NULL);
223                 if (IS_ERR(type->typ_procroot)) {
224                         CERROR("%s: can't create compat proc entry: %d\n",
225                                name, (int)PTR_ERR(type->typ_procroot));
226                         type->typ_procroot = NULL;
227                 }
228         }
229
230         return type;
231 }
232 EXPORT_SYMBOL(class_add_symlinks);
233 #endif /* HAVE_SERVER_SUPPORT */
234
235 #define CLASS_MAX_NAME 1024
236
237 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
238                         bool enable_proc, struct lprocfs_vars *vars,
239                         const char *name, struct lu_device_type *ldt)
240 {
241         struct obd_type *type;
242         int rc;
243
244         ENTRY;
245         /* sanity check */
246         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
247
248         type = class_search_type(name);
249         if (type) {
250 #ifdef HAVE_SERVER_SUPPORT
251                 if (type->typ_sym_filter)
252                         goto dir_exist;
253 #endif /* HAVE_SERVER_SUPPORT */
254                 kobject_put(&type->typ_kobj);
255                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
256                 RETURN(-EEXIST);
257         }
258
259         OBD_ALLOC(type, sizeof(*type));
260         if (type == NULL)
261                 RETURN(-ENOMEM);
262
263         type->typ_kobj.kset = lustre_kset;
264         kobject_init(&type->typ_kobj, &class_ktype);
265 #ifdef HAVE_SERVER_SUPPORT
266 dir_exist:
267 #endif /* HAVE_SERVER_SUPPORT */
268         OBD_ALLOC_PTR(type->typ_dt_ops);
269         OBD_ALLOC_PTR(type->typ_md_ops);
270
271         if (type->typ_dt_ops == NULL ||
272             type->typ_md_ops == NULL)
273                 GOTO (failed, rc = -ENOMEM);
274
275         *(type->typ_dt_ops) = *dt_ops;
276         /* md_ops is optional */
277         if (md_ops)
278                 *(type->typ_md_ops) = *md_ops;
279         spin_lock_init(&type->obd_type_lock);
280
281 #ifdef HAVE_SERVER_SUPPORT
282         if (type->typ_sym_filter) {
283                 type->typ_sym_filter = false;
284                 kobject_put(&type->typ_kobj);
285                 goto setup_ldt;
286         }
287 #endif
288 #ifdef CONFIG_PROC_FS
289         if (enable_proc && !type->typ_procroot) {
290                 type->typ_procroot = lprocfs_register(name,
291                                                       proc_lustre_root,
292                                                       NULL, type);
293                 if (IS_ERR(type->typ_procroot)) {
294                         rc = PTR_ERR(type->typ_procroot);
295                         type->typ_procroot = NULL;
296                         GOTO(failed, rc);
297                 }
298         }
299 #endif
300         type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
301                                                     vars, type);
302         if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
303                 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
304                                              : -ENOMEM;
305                 type->typ_debugfs_entry = NULL;
306                 GOTO(failed, rc);
307         }
308
309         rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
310         if (rc)
311                 GOTO(failed, rc);
312 #ifdef HAVE_SERVER_SUPPORT
313 setup_ldt:
314 #endif
315         if (ldt) {
316                 type->typ_lu = ldt;
317                 rc = lu_device_type_init(ldt);
318                 if (rc)
319                         GOTO(failed, rc);
320         }
321
322         RETURN(0);
323
324 failed:
325         kobject_put(&type->typ_kobj);
326
327         RETURN(rc);
328 }
329 EXPORT_SYMBOL(class_register_type);
330
331 int class_unregister_type(const char *name)
332 {
333         struct obd_type *type = class_search_type(name);
334         int rc = 0;
335         ENTRY;
336
337         if (!type) {
338                 CERROR("unknown obd type\n");
339                 RETURN(-EINVAL);
340         }
341
342         if (type->typ_refcnt) {
343                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
344                 /* This is a bad situation, let's make the best of it */
345                 /* Remove ops, but leave the name for debugging */
346                 OBD_FREE_PTR(type->typ_dt_ops);
347                 OBD_FREE_PTR(type->typ_md_ops);
348                 GOTO(out_put, rc = -EBUSY);
349         }
350
351         /* Put the final ref */
352         kobject_put(&type->typ_kobj);
353 out_put:
354         /* Put the ref returned by class_search_type() */
355         kobject_put(&type->typ_kobj);
356
357         RETURN(rc);
358 } /* class_unregister_type */
359 EXPORT_SYMBOL(class_unregister_type);
360
361 /**
362  * Create a new obd device.
363  *
364  * Allocate the new obd_device and initialize it.
365  *
366  * \param[in] type_name obd device type string.
367  * \param[in] name      obd device name.
368  * \param[in] uuid      obd device UUID
369  *
370  * \retval newdev         pointer to created obd_device
371  * \retval ERR_PTR(errno) on error
372  */
373 struct obd_device *class_newdev(const char *type_name, const char *name,
374                                 const char *uuid)
375 {
376         struct obd_device *newdev;
377         struct obd_type *type = NULL;
378         ENTRY;
379
380         if (strlen(name) >= MAX_OBD_NAME) {
381                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
382                 RETURN(ERR_PTR(-EINVAL));
383         }
384
385         type = class_get_type(type_name);
386         if (type == NULL){
387                 CERROR("OBD: unknown type: %s\n", type_name);
388                 RETURN(ERR_PTR(-ENODEV));
389         }
390
391         newdev = obd_device_alloc();
392         if (newdev == NULL) {
393                 class_put_type(type);
394                 RETURN(ERR_PTR(-ENOMEM));
395         }
396         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
397         strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
398         newdev->obd_type = type;
399         newdev->obd_minor = -1;
400
401         rwlock_init(&newdev->obd_pool_lock);
402         newdev->obd_pool_limit = 0;
403         newdev->obd_pool_slv = 0;
404
405         INIT_LIST_HEAD(&newdev->obd_exports);
406         INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
407         INIT_LIST_HEAD(&newdev->obd_delayed_exports);
408         INIT_LIST_HEAD(&newdev->obd_exports_timed);
409         INIT_LIST_HEAD(&newdev->obd_nid_stats);
410         spin_lock_init(&newdev->obd_nid_lock);
411         spin_lock_init(&newdev->obd_dev_lock);
412         mutex_init(&newdev->obd_dev_mutex);
413         spin_lock_init(&newdev->obd_osfs_lock);
414         /* newdev->obd_osfs_age must be set to a value in the distant
415          * past to guarantee a fresh statfs is fetched on mount. */
416         newdev->obd_osfs_age = ktime_get_seconds() - 1000;
417
418         /* XXX belongs in setup not attach  */
419         init_rwsem(&newdev->obd_observer_link_sem);
420         /* recovery data */
421         spin_lock_init(&newdev->obd_recovery_task_lock);
422         init_waitqueue_head(&newdev->obd_next_transno_waitq);
423         init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
424         INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
425         INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
426         INIT_LIST_HEAD(&newdev->obd_final_req_queue);
427         INIT_LIST_HEAD(&newdev->obd_evict_list);
428         INIT_LIST_HEAD(&newdev->obd_lwp_list);
429
430         llog_group_init(&newdev->obd_olg);
431         /* Detach drops this */
432         atomic_set(&newdev->obd_refcount, 1);
433         lu_ref_init(&newdev->obd_reference);
434         lu_ref_add(&newdev->obd_reference, "newdev", newdev);
435
436         newdev->obd_conn_inprogress = 0;
437
438         strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
439
440         CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
441                newdev->obd_name, newdev);
442
443         return newdev;
444 }
445
446 /**
447  * Free obd device.
448  *
449  * \param[in] obd obd_device to be freed
450  *
451  * \retval none
452  */
453 void class_free_dev(struct obd_device *obd)
454 {
455         struct obd_type *obd_type = obd->obd_type;
456
457         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
458                  "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
459         LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
460                  "obd %p != obd_devs[%d] %p\n",
461                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
462         LASSERTF(atomic_read(&obd->obd_refcount) == 0,
463                  "obd_refcount should be 0, not %d\n",
464                  atomic_read(&obd->obd_refcount));
465         LASSERT(obd_type != NULL);
466
467         CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
468                obd->obd_name, obd->obd_type->typ_name);
469
470         CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
471                          obd->obd_name, obd->obd_uuid.uuid);
472         if (obd->obd_stopping) {
473                 int err;
474
475                 /* If we're not stopping, we were never set up */
476                 err = obd_cleanup(obd);
477                 if (err)
478                         CERROR("Cleanup %s returned %d\n",
479                                 obd->obd_name, err);
480         }
481
482         obd_device_free(obd);
483
484         class_put_type(obd_type);
485 }
486
487 /**
488  * Unregister obd device.
489  *
490  * Free slot in obd_dev[] used by \a obd.
491  *
492  * \param[in] new_obd obd_device to be unregistered
493  *
494  * \retval none
495  */
496 void class_unregister_device(struct obd_device *obd)
497 {
498         write_lock(&obd_dev_lock);
499         if (obd->obd_minor >= 0) {
500                 LASSERT(obd_devs[obd->obd_minor] == obd);
501                 obd_devs[obd->obd_minor] = NULL;
502                 obd->obd_minor = -1;
503         }
504         write_unlock(&obd_dev_lock);
505 }
506
507 /**
508  * Register obd device.
509  *
510  * Find free slot in obd_devs[], fills it with \a new_obd.
511  *
512  * \param[in] new_obd obd_device to be registered
513  *
514  * \retval 0          success
515  * \retval -EEXIST    device with this name is registered
516  * \retval -EOVERFLOW obd_devs[] is full
517  */
518 int class_register_device(struct obd_device *new_obd)
519 {
520         int ret = 0;
521         int i;
522         int new_obd_minor = 0;
523         bool minor_assign = false;
524         bool retried = false;
525
526 again:
527         write_lock(&obd_dev_lock);
528         for (i = 0; i < class_devno_max(); i++) {
529                 struct obd_device *obd = class_num2obd(i);
530
531                 if (obd != NULL &&
532                     (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
533
534                         if (!retried) {
535                                 write_unlock(&obd_dev_lock);
536
537                                 /* the obd_device could be waited to be
538                                  * destroyed by the "obd_zombie_impexp_thread".
539                                  */
540                                 obd_zombie_barrier();
541                                 retried = true;
542                                 goto again;
543                         }
544
545                         CERROR("%s: already exists, won't add\n",
546                                obd->obd_name);
547                         /* in case we found a free slot before duplicate */
548                         minor_assign = false;
549                         ret = -EEXIST;
550                         break;
551                 }
552                 if (!minor_assign && obd == NULL) {
553                         new_obd_minor = i;
554                         minor_assign = true;
555                 }
556         }
557
558         if (minor_assign) {
559                 new_obd->obd_minor = new_obd_minor;
560                 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
561                          "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
562                 obd_devs[new_obd_minor] = new_obd;
563         } else {
564                 if (ret == 0) {
565                         ret = -EOVERFLOW;
566                         CERROR("%s: all %u/%u devices used, increase "
567                                "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
568                                i, class_devno_max(), ret);
569                 }
570         }
571         write_unlock(&obd_dev_lock);
572
573         RETURN(ret);
574 }
575
576 static int class_name2dev_nolock(const char *name)
577 {
578         int i;
579
580         if (!name)
581                 return -1;
582
583         for (i = 0; i < class_devno_max(); i++) {
584                 struct obd_device *obd = class_num2obd(i);
585
586                 if (obd && strcmp(name, obd->obd_name) == 0) {
587                         /* Make sure we finished attaching before we give
588                            out any references */
589                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
590                         if (obd->obd_attached) {
591                                 return i;
592                         }
593                         break;
594                 }
595         }
596
597         return -1;
598 }
599
600 int class_name2dev(const char *name)
601 {
602         int i;
603
604         if (!name)
605                 return -1;
606
607         read_lock(&obd_dev_lock);
608         i = class_name2dev_nolock(name);
609         read_unlock(&obd_dev_lock);
610
611         return i;
612 }
613 EXPORT_SYMBOL(class_name2dev);
614
615 struct obd_device *class_name2obd(const char *name)
616 {
617         int dev = class_name2dev(name);
618
619         if (dev < 0 || dev > class_devno_max())
620                 return NULL;
621         return class_num2obd(dev);
622 }
623 EXPORT_SYMBOL(class_name2obd);
624
625 int class_uuid2dev_nolock(struct obd_uuid *uuid)
626 {
627         int i;
628
629         for (i = 0; i < class_devno_max(); i++) {
630                 struct obd_device *obd = class_num2obd(i);
631
632                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
633                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
634                         return i;
635                 }
636         }
637
638         return -1;
639 }
640
641 int class_uuid2dev(struct obd_uuid *uuid)
642 {
643         int i;
644
645         read_lock(&obd_dev_lock);
646         i = class_uuid2dev_nolock(uuid);
647         read_unlock(&obd_dev_lock);
648
649         return i;
650 }
651 EXPORT_SYMBOL(class_uuid2dev);
652
653 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
654 {
655         int dev = class_uuid2dev(uuid);
656         if (dev < 0)
657                 return NULL;
658         return class_num2obd(dev);
659 }
660 EXPORT_SYMBOL(class_uuid2obd);
661
662 /**
663  * Get obd device from ::obd_devs[]
664  *
665  * \param num [in] array index
666  *
667  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
668  *         otherwise return the obd device there.
669  */
670 struct obd_device *class_num2obd(int num)
671 {
672         struct obd_device *obd = NULL;
673
674         if (num < class_devno_max()) {
675                 obd = obd_devs[num];
676                 if (obd == NULL)
677                         return NULL;
678
679                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
680                          "%p obd_magic %08x != %08x\n",
681                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
682                 LASSERTF(obd->obd_minor == num,
683                          "%p obd_minor %0d != %0d\n",
684                          obd, obd->obd_minor, num);
685         }
686
687         return obd;
688 }
689
690 /**
691  * Find obd in obd_dev[] by name or uuid.
692  *
693  * Increment obd's refcount if found.
694  *
695  * \param[in] str obd name or uuid
696  *
697  * \retval NULL    if not found
698  * \retval target  pointer to found obd_device
699  */
700 struct obd_device *class_dev_by_str(const char *str)
701 {
702         struct obd_device *target = NULL;
703         struct obd_uuid tgtuuid;
704         int rc;
705
706         obd_str2uuid(&tgtuuid, str);
707
708         read_lock(&obd_dev_lock);
709         rc = class_uuid2dev_nolock(&tgtuuid);
710         if (rc < 0)
711                 rc = class_name2dev_nolock(str);
712
713         if (rc >= 0)
714                 target = class_num2obd(rc);
715
716         if (target != NULL)
717                 class_incref(target, "find", current);
718         read_unlock(&obd_dev_lock);
719
720         RETURN(target);
721 }
722 EXPORT_SYMBOL(class_dev_by_str);
723
724 /**
725  * Get obd devices count. Device in any
726  *    state are counted
727  * \retval obd device count
728  */
729 int get_devices_count(void)
730 {
731         int index, max_index = class_devno_max(), dev_count = 0;
732
733         read_lock(&obd_dev_lock);
734         for (index = 0; index <= max_index; index++) {
735                 struct obd_device *obd = class_num2obd(index);
736                 if (obd != NULL)
737                         dev_count++;
738         }
739         read_unlock(&obd_dev_lock);
740
741         return dev_count;
742 }
743 EXPORT_SYMBOL(get_devices_count);
744
745 void class_obd_list(void)
746 {
747         char *status;
748         int i;
749
750         read_lock(&obd_dev_lock);
751         for (i = 0; i < class_devno_max(); i++) {
752                 struct obd_device *obd = class_num2obd(i);
753
754                 if (obd == NULL)
755                         continue;
756                 if (obd->obd_stopping)
757                         status = "ST";
758                 else if (obd->obd_set_up)
759                         status = "UP";
760                 else if (obd->obd_attached)
761                         status = "AT";
762                 else
763                         status = "--";
764                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
765                          i, status, obd->obd_type->typ_name,
766                          obd->obd_name, obd->obd_uuid.uuid,
767                          atomic_read(&obd->obd_refcount));
768         }
769         read_unlock(&obd_dev_lock);
770 }
771
772 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
773    specified, then only the client with that uuid is returned,
774    otherwise any client connected to the tgt is returned. */
775 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
776                                           const char *type_name,
777                                           struct obd_uuid *grp_uuid)
778 {
779         int i;
780
781         read_lock(&obd_dev_lock);
782         for (i = 0; i < class_devno_max(); i++) {
783                 struct obd_device *obd = class_num2obd(i);
784
785                 if (obd == NULL)
786                         continue;
787                 if ((strncmp(obd->obd_type->typ_name, type_name,
788                              strlen(type_name)) == 0)) {
789                         if (obd_uuid_equals(tgt_uuid,
790                                             &obd->u.cli.cl_target_uuid) &&
791                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
792                                                          &obd->obd_uuid) : 1)) {
793                                 read_unlock(&obd_dev_lock);
794                                 return obd;
795                         }
796                 }
797         }
798         read_unlock(&obd_dev_lock);
799
800         return NULL;
801 }
802 EXPORT_SYMBOL(class_find_client_obd);
803
804 /* Iterate the obd_device list looking devices have grp_uuid. Start
805    searching at *next, and if a device is found, the next index to look
806    at is saved in *next. If next is NULL, then the first matching device
807    will always be returned. */
808 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
809 {
810         int i;
811
812         if (next == NULL)
813                 i = 0;
814         else if (*next >= 0 && *next < class_devno_max())
815                 i = *next;
816         else
817                 return NULL;
818
819         read_lock(&obd_dev_lock);
820         for (; i < class_devno_max(); i++) {
821                 struct obd_device *obd = class_num2obd(i);
822
823                 if (obd == NULL)
824                         continue;
825                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
826                         if (next != NULL)
827                                 *next = i+1;
828                         read_unlock(&obd_dev_lock);
829                         return obd;
830                 }
831         }
832         read_unlock(&obd_dev_lock);
833
834         return NULL;
835 }
836 EXPORT_SYMBOL(class_devices_in_group);
837
838 /**
839  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
840  * adjust sptlrpc settings accordingly.
841  */
842 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
843 {
844         struct obd_device  *obd;
845         const char         *type;
846         int                 i, rc = 0, rc2;
847
848         LASSERT(namelen > 0);
849
850         read_lock(&obd_dev_lock);
851         for (i = 0; i < class_devno_max(); i++) {
852                 obd = class_num2obd(i);
853
854                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
855                         continue;
856
857                 /* only notify mdc, osc, osp, lwp, mdt, ost
858                  * because only these have a -sptlrpc llog */
859                 type = obd->obd_type->typ_name;
860                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
861                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
862                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
863                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
864                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
865                     strcmp(type, LUSTRE_OST_NAME) != 0)
866                         continue;
867
868                 if (strncmp(obd->obd_name, fsname, namelen))
869                         continue;
870
871                 class_incref(obd, __FUNCTION__, obd);
872                 read_unlock(&obd_dev_lock);
873                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
874                                          sizeof(KEY_SPTLRPC_CONF),
875                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
876                 rc = rc ? rc : rc2;
877                 class_decref(obd, __FUNCTION__, obd);
878                 read_lock(&obd_dev_lock);
879         }
880         read_unlock(&obd_dev_lock);
881         return rc;
882 }
883 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
884
885 void obd_cleanup_caches(void)
886 {
887         ENTRY;
888         if (obd_device_cachep) {
889                 kmem_cache_destroy(obd_device_cachep);
890                 obd_device_cachep = NULL;
891         }
892
893         EXIT;
894 }
895
896 int obd_init_caches(void)
897 {
898         int rc;
899         ENTRY;
900
901         LASSERT(obd_device_cachep == NULL);
902         obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
903                                 sizeof(struct obd_device),
904                                 0, 0, 0, sizeof(struct obd_device), NULL);
905         if (!obd_device_cachep)
906                 GOTO(out, rc = -ENOMEM);
907
908         RETURN(0);
909 out:
910         obd_cleanup_caches();
911         RETURN(rc);
912 }
913
914 static struct portals_handle_ops export_handle_ops;
915
916 /* map connection to client */
917 struct obd_export *class_conn2export(struct lustre_handle *conn)
918 {
919         struct obd_export *export;
920         ENTRY;
921
922         if (!conn) {
923                 CDEBUG(D_CACHE, "looking for null handle\n");
924                 RETURN(NULL);
925         }
926
927         if (conn->cookie == -1) {  /* this means assign a new connection */
928                 CDEBUG(D_CACHE, "want a new connection\n");
929                 RETURN(NULL);
930         }
931
932         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
933         export = class_handle2object(conn->cookie, &export_handle_ops);
934         RETURN(export);
935 }
936 EXPORT_SYMBOL(class_conn2export);
937
938 struct obd_device *class_exp2obd(struct obd_export *exp)
939 {
940         if (exp)
941                 return exp->exp_obd;
942         return NULL;
943 }
944 EXPORT_SYMBOL(class_exp2obd);
945
946 struct obd_import *class_exp2cliimp(struct obd_export *exp)
947 {
948         struct obd_device *obd = exp->exp_obd;
949         if (obd == NULL)
950                 return NULL;
951         return obd->u.cli.cl_import;
952 }
953 EXPORT_SYMBOL(class_exp2cliimp);
954
955 /* Export management functions */
956 static void class_export_destroy(struct obd_export *exp)
957 {
958         struct obd_device *obd = exp->exp_obd;
959         ENTRY;
960
961         LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
962         LASSERT(obd != NULL);
963
964         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
965                exp->exp_client_uuid.uuid, obd->obd_name);
966
967         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
968         if (exp->exp_connection)
969                 ptlrpc_put_connection_superhack(exp->exp_connection);
970
971         LASSERT(list_empty(&exp->exp_outstanding_replies));
972         LASSERT(list_empty(&exp->exp_uncommitted_replies));
973         LASSERT(list_empty(&exp->exp_req_replay_queue));
974         LASSERT(list_empty(&exp->exp_hp_rpcs));
975         obd_destroy_export(exp);
976         /* self export doesn't hold a reference to an obd, although it
977          * exists until freeing of the obd */
978         if (exp != obd->obd_self_export)
979                 class_decref(obd, "export", exp);
980
981         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
982         EXIT;
983 }
984
985 static struct portals_handle_ops export_handle_ops = {
986         .hop_free   = NULL,
987         .hop_type       = "export",
988 };
989
990 struct obd_export *class_export_get(struct obd_export *exp)
991 {
992         refcount_inc(&exp->exp_handle.h_ref);
993         CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
994                refcount_read(&exp->exp_handle.h_ref));
995         return exp;
996 }
997 EXPORT_SYMBOL(class_export_get);
998
999 void class_export_put(struct obd_export *exp)
1000 {
1001         LASSERT(exp != NULL);
1002         LASSERT(refcount_read(&exp->exp_handle.h_ref) >  0);
1003         LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
1004         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1005                refcount_read(&exp->exp_handle.h_ref) - 1);
1006
1007         if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
1008                 struct obd_device *obd = exp->exp_obd;
1009
1010                 CDEBUG(D_IOCTL, "final put %p/%s\n",
1011                        exp, exp->exp_client_uuid.uuid);
1012
1013                 /* release nid stat refererence */
1014                 lprocfs_exp_cleanup(exp);
1015
1016                 if (exp == obd->obd_self_export) {
1017                         /* self export should be destroyed without
1018                          * zombie thread as it doesn't hold a
1019                          * reference to obd and doesn't hold any
1020                          * resources */
1021                         class_export_destroy(exp);
1022                         /* self export is destroyed, no class
1023                          * references exist and it is safe to free
1024                          * obd */
1025                         class_free_dev(obd);
1026                 } else {
1027                         LASSERT(!list_empty(&exp->exp_obd_chain));
1028                         obd_zombie_export_add(exp);
1029                 }
1030
1031         }
1032 }
1033 EXPORT_SYMBOL(class_export_put);
1034
1035 static void obd_zombie_exp_cull(struct work_struct *ws)
1036 {
1037         struct obd_export *export;
1038
1039         export = container_of(ws, struct obd_export, exp_zombie_work);
1040         class_export_destroy(export);
1041 }
1042
1043 /* Creates a new export, adds it to the hash table, and returns a
1044  * pointer to it. The refcount is 2: one for the hash reference, and
1045  * one for the pointer returned by this function. */
1046 struct obd_export *__class_new_export(struct obd_device *obd,
1047                                       struct obd_uuid *cluuid, bool is_self)
1048 {
1049         struct obd_export *export;
1050         struct cfs_hash *hash = NULL;
1051         int rc = 0;
1052         ENTRY;
1053
1054         OBD_ALLOC_PTR(export);
1055         if (!export)
1056                 return ERR_PTR(-ENOMEM);
1057
1058         export->exp_conn_cnt = 0;
1059         export->exp_lock_hash = NULL;
1060         export->exp_flock_hash = NULL;
1061         /* 2 = class_handle_hash + last */
1062         refcount_set(&export->exp_handle.h_ref, 2);
1063         atomic_set(&export->exp_rpc_count, 0);
1064         atomic_set(&export->exp_cb_count, 0);
1065         atomic_set(&export->exp_locks_count, 0);
1066 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1067         INIT_LIST_HEAD(&export->exp_locks_list);
1068         spin_lock_init(&export->exp_locks_list_guard);
1069 #endif
1070         atomic_set(&export->exp_replay_count, 0);
1071         export->exp_obd = obd;
1072         INIT_LIST_HEAD(&export->exp_outstanding_replies);
1073         spin_lock_init(&export->exp_uncommitted_replies_lock);
1074         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1075         INIT_LIST_HEAD(&export->exp_req_replay_queue);
1076         INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1077         INIT_LIST_HEAD(&export->exp_hp_rpcs);
1078         INIT_LIST_HEAD(&export->exp_reg_rpcs);
1079         class_handle_hash(&export->exp_handle, &export_handle_ops);
1080         export->exp_last_request_time = ktime_get_real_seconds();
1081         spin_lock_init(&export->exp_lock);
1082         spin_lock_init(&export->exp_rpc_lock);
1083         INIT_HLIST_NODE(&export->exp_uuid_hash);
1084         INIT_HLIST_NODE(&export->exp_nid_hash);
1085         INIT_HLIST_NODE(&export->exp_gen_hash);
1086         spin_lock_init(&export->exp_bl_list_lock);
1087         INIT_LIST_HEAD(&export->exp_bl_list);
1088         INIT_LIST_HEAD(&export->exp_stale_list);
1089         INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1090
1091         export->exp_sp_peer = LUSTRE_SP_ANY;
1092         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1093         export->exp_client_uuid = *cluuid;
1094         obd_init_export(export);
1095
1096         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1097                 spin_lock(&obd->obd_dev_lock);
1098                 /* shouldn't happen, but might race */
1099                 if (obd->obd_stopping)
1100                         GOTO(exit_unlock, rc = -ENODEV);
1101
1102                 hash = cfs_hash_getref(obd->obd_uuid_hash);
1103                 if (hash == NULL)
1104                         GOTO(exit_unlock, rc = -ENODEV);
1105                 spin_unlock(&obd->obd_dev_lock);
1106
1107                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1108                 if (rc != 0) {
1109                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1110                                       obd->obd_name, cluuid->uuid, rc);
1111                         GOTO(exit_err, rc = -EALREADY);
1112                 }
1113         }
1114
1115         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1116         spin_lock(&obd->obd_dev_lock);
1117         if (obd->obd_stopping) {
1118                 if (hash)
1119                         cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1120                 GOTO(exit_unlock, rc = -ESHUTDOWN);
1121         }
1122
1123         if (!is_self) {
1124                 class_incref(obd, "export", export);
1125                 list_add_tail(&export->exp_obd_chain_timed,
1126                               &obd->obd_exports_timed);
1127                 list_add(&export->exp_obd_chain, &obd->obd_exports);
1128                 obd->obd_num_exports++;
1129         } else {
1130                 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1131                 INIT_LIST_HEAD(&export->exp_obd_chain);
1132         }
1133         spin_unlock(&obd->obd_dev_lock);
1134         if (hash)
1135                 cfs_hash_putref(hash);
1136         RETURN(export);
1137
1138 exit_unlock:
1139         spin_unlock(&obd->obd_dev_lock);
1140 exit_err:
1141         if (hash)
1142                 cfs_hash_putref(hash);
1143         class_handle_unhash(&export->exp_handle);
1144         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1145         obd_destroy_export(export);
1146         OBD_FREE_PTR(export);
1147         return ERR_PTR(rc);
1148 }
1149
1150 struct obd_export *class_new_export(struct obd_device *obd,
1151                                     struct obd_uuid *uuid)
1152 {
1153         return __class_new_export(obd, uuid, false);
1154 }
1155 EXPORT_SYMBOL(class_new_export);
1156
1157 struct obd_export *class_new_export_self(struct obd_device *obd,
1158                                          struct obd_uuid *uuid)
1159 {
1160         return __class_new_export(obd, uuid, true);
1161 }
1162
1163 void class_unlink_export(struct obd_export *exp)
1164 {
1165         class_handle_unhash(&exp->exp_handle);
1166
1167         if (exp->exp_obd->obd_self_export == exp) {
1168                 class_export_put(exp);
1169                 return;
1170         }
1171
1172         spin_lock(&exp->exp_obd->obd_dev_lock);
1173         /* delete an uuid-export hashitem from hashtables */
1174         if (!hlist_unhashed(&exp->exp_uuid_hash))
1175                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1176                              &exp->exp_client_uuid,
1177                              &exp->exp_uuid_hash);
1178
1179 #ifdef HAVE_SERVER_SUPPORT
1180         if (!hlist_unhashed(&exp->exp_gen_hash)) {
1181                 struct tg_export_data   *ted = &exp->exp_target_data;
1182                 struct cfs_hash         *hash;
1183
1184                 /* Because obd_gen_hash will not be released until
1185                  * class_cleanup(), so hash should never be NULL here */
1186                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1187                 LASSERT(hash != NULL);
1188                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1189                              &exp->exp_gen_hash);
1190                 cfs_hash_putref(hash);
1191         }
1192 #endif /* HAVE_SERVER_SUPPORT */
1193
1194         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1195         list_del_init(&exp->exp_obd_chain_timed);
1196         exp->exp_obd->obd_num_exports--;
1197         spin_unlock(&exp->exp_obd->obd_dev_lock);
1198         atomic_inc(&obd_stale_export_num);
1199
1200         /* A reference is kept by obd_stale_exports list */
1201         obd_stale_export_put(exp);
1202 }
1203 EXPORT_SYMBOL(class_unlink_export);
1204
1205 /* Import management functions */
1206 static void obd_zombie_import_free(struct obd_import *imp)
1207 {
1208         ENTRY;
1209
1210         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1211                 imp->imp_obd->obd_name);
1212
1213         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1214
1215         ptlrpc_put_connection_superhack(imp->imp_connection);
1216
1217         while (!list_empty(&imp->imp_conn_list)) {
1218                 struct obd_import_conn *imp_conn;
1219
1220                 imp_conn = list_entry(imp->imp_conn_list.next,
1221                                       struct obd_import_conn, oic_item);
1222                 list_del_init(&imp_conn->oic_item);
1223                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1224                 OBD_FREE(imp_conn, sizeof(*imp_conn));
1225         }
1226
1227         LASSERT(imp->imp_sec == NULL);
1228         class_decref(imp->imp_obd, "import", imp);
1229         OBD_FREE_PTR(imp);
1230         EXIT;
1231 }
1232
1233 struct obd_import *class_import_get(struct obd_import *import)
1234 {
1235         atomic_inc(&import->imp_refcount);
1236         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1237                atomic_read(&import->imp_refcount),
1238                import->imp_obd->obd_name);
1239         return import;
1240 }
1241 EXPORT_SYMBOL(class_import_get);
1242
1243 void class_import_put(struct obd_import *imp)
1244 {
1245         ENTRY;
1246
1247         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1248
1249         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1250                atomic_read(&imp->imp_refcount) - 1,
1251                imp->imp_obd->obd_name);
1252
1253         if (atomic_dec_and_test(&imp->imp_refcount)) {
1254                 CDEBUG(D_INFO, "final put import %p\n", imp);
1255                 obd_zombie_import_add(imp);
1256         }
1257
1258         /* catch possible import put race */
1259         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1260         EXIT;
1261 }
1262 EXPORT_SYMBOL(class_import_put);
1263
1264 static void init_imp_at(struct imp_at *at) {
1265         int i;
1266         at_init(&at->iat_net_latency, 0, 0);
1267         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1268                 /* max service estimates are tracked on the server side, so
1269                    don't use the AT history here, just use the last reported
1270                    val. (But keep hist for proc histogram, worst_ever) */
1271                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1272                         AT_FLG_NOHIST);
1273         }
1274 }
1275
1276 static void obd_zombie_imp_cull(struct work_struct *ws)
1277 {
1278         struct obd_import *import;
1279
1280         import = container_of(ws, struct obd_import, imp_zombie_work);
1281         obd_zombie_import_free(import);
1282 }
1283
1284 struct obd_import *class_new_import(struct obd_device *obd)
1285 {
1286         struct obd_import *imp;
1287         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1288
1289         OBD_ALLOC(imp, sizeof(*imp));
1290         if (imp == NULL)
1291                 return NULL;
1292
1293         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1294         INIT_LIST_HEAD(&imp->imp_replay_list);
1295         INIT_LIST_HEAD(&imp->imp_sending_list);
1296         INIT_LIST_HEAD(&imp->imp_delayed_list);
1297         INIT_LIST_HEAD(&imp->imp_committed_list);
1298         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1299         imp->imp_known_replied_xid = 0;
1300         imp->imp_replay_cursor = &imp->imp_committed_list;
1301         spin_lock_init(&imp->imp_lock);
1302         imp->imp_last_success_conn = 0;
1303         imp->imp_state = LUSTRE_IMP_NEW;
1304         imp->imp_obd = class_incref(obd, "import", imp);
1305         rwlock_init(&imp->imp_sec_lock);
1306         init_waitqueue_head(&imp->imp_recovery_waitq);
1307         INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1308
1309         if (curr_pid_ns->child_reaper)
1310                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1311         else
1312                 imp->imp_sec_refpid = 1;
1313
1314         atomic_set(&imp->imp_refcount, 2);
1315         atomic_set(&imp->imp_unregistering, 0);
1316         atomic_set(&imp->imp_inflight, 0);
1317         atomic_set(&imp->imp_replay_inflight, 0);
1318         atomic_set(&imp->imp_inval_count, 0);
1319         INIT_LIST_HEAD(&imp->imp_conn_list);
1320         init_imp_at(&imp->imp_at);
1321
1322         /* the default magic is V2, will be used in connect RPC, and
1323          * then adjusted according to the flags in request/reply. */
1324         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1325
1326         return imp;
1327 }
1328 EXPORT_SYMBOL(class_new_import);
1329
1330 void class_destroy_import(struct obd_import *import)
1331 {
1332         LASSERT(import != NULL);
1333         LASSERT(import != LP_POISON);
1334
1335         spin_lock(&import->imp_lock);
1336         import->imp_generation++;
1337         spin_unlock(&import->imp_lock);
1338         class_import_put(import);
1339 }
1340 EXPORT_SYMBOL(class_destroy_import);
1341
1342 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1343
1344 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1345 {
1346         spin_lock(&exp->exp_locks_list_guard);
1347
1348         LASSERT(lock->l_exp_refs_nr >= 0);
1349
1350         if (lock->l_exp_refs_target != NULL &&
1351             lock->l_exp_refs_target != exp) {
1352                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1353                               exp, lock, lock->l_exp_refs_target);
1354         }
1355         if ((lock->l_exp_refs_nr ++) == 0) {
1356                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1357                 lock->l_exp_refs_target = exp;
1358         }
1359         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1360                lock, exp, lock->l_exp_refs_nr);
1361         spin_unlock(&exp->exp_locks_list_guard);
1362 }
1363 EXPORT_SYMBOL(__class_export_add_lock_ref);
1364
1365 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1366 {
1367         spin_lock(&exp->exp_locks_list_guard);
1368         LASSERT(lock->l_exp_refs_nr > 0);
1369         if (lock->l_exp_refs_target != exp) {
1370                 LCONSOLE_WARN("lock %p, "
1371                               "mismatching export pointers: %p, %p\n",
1372                               lock, lock->l_exp_refs_target, exp);
1373         }
1374         if (-- lock->l_exp_refs_nr == 0) {
1375                 list_del_init(&lock->l_exp_refs_link);
1376                 lock->l_exp_refs_target = NULL;
1377         }
1378         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1379                lock, exp, lock->l_exp_refs_nr);
1380         spin_unlock(&exp->exp_locks_list_guard);
1381 }
1382 EXPORT_SYMBOL(__class_export_del_lock_ref);
1383 #endif
1384
1385 /* A connection defines an export context in which preallocation can
1386    be managed. This releases the export pointer reference, and returns
1387    the export handle, so the export refcount is 1 when this function
1388    returns. */
1389 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1390                   struct obd_uuid *cluuid)
1391 {
1392         struct obd_export *export;
1393         LASSERT(conn != NULL);
1394         LASSERT(obd != NULL);
1395         LASSERT(cluuid != NULL);
1396         ENTRY;
1397
1398         export = class_new_export(obd, cluuid);
1399         if (IS_ERR(export))
1400                 RETURN(PTR_ERR(export));
1401
1402         conn->cookie = export->exp_handle.h_cookie;
1403         class_export_put(export);
1404
1405         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1406                cluuid->uuid, conn->cookie);
1407         RETURN(0);
1408 }
1409 EXPORT_SYMBOL(class_connect);
1410
1411 /* if export is involved in recovery then clean up related things */
1412 static void class_export_recovery_cleanup(struct obd_export *exp)
1413 {
1414         struct obd_device *obd = exp->exp_obd;
1415
1416         spin_lock(&obd->obd_recovery_task_lock);
1417         if (obd->obd_recovering) {
1418                 if (exp->exp_in_recovery) {
1419                         spin_lock(&exp->exp_lock);
1420                         exp->exp_in_recovery = 0;
1421                         spin_unlock(&exp->exp_lock);
1422                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1423                         atomic_dec(&obd->obd_connected_clients);
1424                 }
1425
1426                 /* if called during recovery then should update
1427                  * obd_stale_clients counter,
1428                  * lightweight exports are not counted */
1429                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1430                         exp->exp_obd->obd_stale_clients++;
1431         }
1432         spin_unlock(&obd->obd_recovery_task_lock);
1433
1434         spin_lock(&exp->exp_lock);
1435         /** Cleanup req replay fields */
1436         if (exp->exp_req_replay_needed) {
1437                 exp->exp_req_replay_needed = 0;
1438
1439                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1440                 atomic_dec(&obd->obd_req_replay_clients);
1441         }
1442
1443         /** Cleanup lock replay data */
1444         if (exp->exp_lock_replay_needed) {
1445                 exp->exp_lock_replay_needed = 0;
1446
1447                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1448                 atomic_dec(&obd->obd_lock_replay_clients);
1449         }
1450         spin_unlock(&exp->exp_lock);
1451 }
1452
1453 /* This function removes 1-3 references from the export:
1454  * 1 - for export pointer passed
1455  * and if disconnect really need
1456  * 2 - removing from hash
1457  * 3 - in client_unlink_export
1458  * The export pointer passed to this function can destroyed */
1459 int class_disconnect(struct obd_export *export)
1460 {
1461         int already_disconnected;
1462         ENTRY;
1463
1464         if (export == NULL) {
1465                 CWARN("attempting to free NULL export %p\n", export);
1466                 RETURN(-EINVAL);
1467         }
1468
1469         spin_lock(&export->exp_lock);
1470         already_disconnected = export->exp_disconnected;
1471         export->exp_disconnected = 1;
1472         /*  We hold references of export for uuid hash
1473          *  and nid_hash and export link at least. So
1474          *  it is safe to call cfs_hash_del in there.  */
1475         if (!hlist_unhashed(&export->exp_nid_hash))
1476                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1477                              &export->exp_connection->c_peer.nid,
1478                              &export->exp_nid_hash);
1479         spin_unlock(&export->exp_lock);
1480
1481         /* class_cleanup(), abort_recovery(), and class_fail_export()
1482          * all end up in here, and if any of them race we shouldn't
1483          * call extra class_export_puts(). */
1484         if (already_disconnected) {
1485                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1486                 GOTO(no_disconn, already_disconnected);
1487         }
1488
1489         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1490                export->exp_handle.h_cookie);
1491
1492         class_export_recovery_cleanup(export);
1493         class_unlink_export(export);
1494 no_disconn:
1495         class_export_put(export);
1496         RETURN(0);
1497 }
1498 EXPORT_SYMBOL(class_disconnect);
1499
1500 /* Return non-zero for a fully connected export */
1501 int class_connected_export(struct obd_export *exp)
1502 {
1503         int connected = 0;
1504
1505         if (exp) {
1506                 spin_lock(&exp->exp_lock);
1507                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1508                 spin_unlock(&exp->exp_lock);
1509         }
1510         return connected;
1511 }
1512 EXPORT_SYMBOL(class_connected_export);
1513
1514 static void class_disconnect_export_list(struct list_head *list,
1515                                          enum obd_option flags)
1516 {
1517         int rc;
1518         struct obd_export *exp;
1519         ENTRY;
1520
1521         /* It's possible that an export may disconnect itself, but
1522          * nothing else will be added to this list. */
1523         while (!list_empty(list)) {
1524                 exp = list_entry(list->next, struct obd_export,
1525                                  exp_obd_chain);
1526                 /* need for safe call CDEBUG after obd_disconnect */
1527                 class_export_get(exp);
1528
1529                 spin_lock(&exp->exp_lock);
1530                 exp->exp_flags = flags;
1531                 spin_unlock(&exp->exp_lock);
1532
1533                 if (obd_uuid_equals(&exp->exp_client_uuid,
1534                                     &exp->exp_obd->obd_uuid)) {
1535                         CDEBUG(D_HA,
1536                                "exp %p export uuid == obd uuid, don't discon\n",
1537                                exp);
1538                         /* Need to delete this now so we don't end up pointing
1539                          * to work_list later when this export is cleaned up. */
1540                         list_del_init(&exp->exp_obd_chain);
1541                         class_export_put(exp);
1542                         continue;
1543                 }
1544
1545                 class_export_get(exp);
1546                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1547                        "last request at %lld\n",
1548                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1549                        exp, exp->exp_last_request_time);
1550                 /* release one export reference anyway */
1551                 rc = obd_disconnect(exp);
1552
1553                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1554                        obd_export_nid2str(exp), exp, rc);
1555                 class_export_put(exp);
1556         }
1557         EXIT;
1558 }
1559
1560 void class_disconnect_exports(struct obd_device *obd)
1561 {
1562         struct list_head work_list;
1563         ENTRY;
1564
1565         /* Move all of the exports from obd_exports to a work list, en masse. */
1566         INIT_LIST_HEAD(&work_list);
1567         spin_lock(&obd->obd_dev_lock);
1568         list_splice_init(&obd->obd_exports, &work_list);
1569         list_splice_init(&obd->obd_delayed_exports, &work_list);
1570         spin_unlock(&obd->obd_dev_lock);
1571
1572         if (!list_empty(&work_list)) {
1573                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1574                        "disconnecting them\n", obd->obd_minor, obd);
1575                 class_disconnect_export_list(&work_list,
1576                                              exp_flags_from_obd(obd));
1577         } else
1578                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1579                        obd->obd_minor, obd);
1580         EXIT;
1581 }
1582 EXPORT_SYMBOL(class_disconnect_exports);
1583
1584 /* Remove exports that have not completed recovery.
1585  */
1586 void class_disconnect_stale_exports(struct obd_device *obd,
1587                                     int (*test_export)(struct obd_export *))
1588 {
1589         struct list_head work_list;
1590         struct obd_export *exp, *n;
1591         int evicted = 0;
1592         ENTRY;
1593
1594         INIT_LIST_HEAD(&work_list);
1595         spin_lock(&obd->obd_dev_lock);
1596         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1597                                  exp_obd_chain) {
1598                 /* don't count self-export as client */
1599                 if (obd_uuid_equals(&exp->exp_client_uuid,
1600                                     &exp->exp_obd->obd_uuid))
1601                         continue;
1602
1603                 /* don't evict clients which have no slot in last_rcvd
1604                  * (e.g. lightweight connection) */
1605                 if (exp->exp_target_data.ted_lr_idx == -1)
1606                         continue;
1607
1608                 spin_lock(&exp->exp_lock);
1609                 if (exp->exp_failed || test_export(exp)) {
1610                         spin_unlock(&exp->exp_lock);
1611                         continue;
1612                 }
1613                 exp->exp_failed = 1;
1614                 spin_unlock(&exp->exp_lock);
1615
1616                 list_move(&exp->exp_obd_chain, &work_list);
1617                 evicted++;
1618                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1619                        obd->obd_name, exp->exp_client_uuid.uuid,
1620                        obd_export_nid2str(exp));
1621                 print_export_data(exp, "EVICTING", 0, D_HA);
1622         }
1623         spin_unlock(&obd->obd_dev_lock);
1624
1625         if (evicted)
1626                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1627                               obd->obd_name, evicted);
1628
1629         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1630                                                  OBD_OPT_ABORT_RECOV);
1631         EXIT;
1632 }
1633 EXPORT_SYMBOL(class_disconnect_stale_exports);
1634
1635 void class_fail_export(struct obd_export *exp)
1636 {
1637         int rc, already_failed;
1638
1639         spin_lock(&exp->exp_lock);
1640         already_failed = exp->exp_failed;
1641         exp->exp_failed = 1;
1642         spin_unlock(&exp->exp_lock);
1643
1644         if (already_failed) {
1645                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1646                        exp, exp->exp_client_uuid.uuid);
1647                 return;
1648         }
1649
1650         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1651                exp, exp->exp_client_uuid.uuid);
1652
1653         if (obd_dump_on_timeout)
1654                 libcfs_debug_dumplog();
1655
1656         /* need for safe call CDEBUG after obd_disconnect */
1657         class_export_get(exp);
1658
1659         /* Most callers into obd_disconnect are removing their own reference
1660          * (request, for example) in addition to the one from the hash table.
1661          * We don't have such a reference here, so make one. */
1662         class_export_get(exp);
1663         rc = obd_disconnect(exp);
1664         if (rc)
1665                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1666         else
1667                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1668                        exp, exp->exp_client_uuid.uuid);
1669         class_export_put(exp);
1670 }
1671 EXPORT_SYMBOL(class_fail_export);
1672
1673 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1674 {
1675         struct cfs_hash *nid_hash;
1676         struct obd_export *doomed_exp = NULL;
1677         int exports_evicted = 0;
1678
1679         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1680
1681         spin_lock(&obd->obd_dev_lock);
1682         /* umount has run already, so evict thread should leave
1683          * its task to umount thread now */
1684         if (obd->obd_stopping) {
1685                 spin_unlock(&obd->obd_dev_lock);
1686                 return exports_evicted;
1687         }
1688         nid_hash = obd->obd_nid_hash;
1689         cfs_hash_getref(nid_hash);
1690         spin_unlock(&obd->obd_dev_lock);
1691
1692         do {
1693                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1694                 if (doomed_exp == NULL)
1695                         break;
1696
1697                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1698                          "nid %s found, wanted nid %s, requested nid %s\n",
1699                          obd_export_nid2str(doomed_exp),
1700                          libcfs_nid2str(nid_key), nid);
1701                 LASSERTF(doomed_exp != obd->obd_self_export,
1702                          "self-export is hashed by NID?\n");
1703                 exports_evicted++;
1704                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1705                               "request\n", obd->obd_name,
1706                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1707                               obd_export_nid2str(doomed_exp));
1708                 class_fail_export(doomed_exp);
1709                 class_export_put(doomed_exp);
1710         } while (1);
1711
1712         cfs_hash_putref(nid_hash);
1713
1714         if (!exports_evicted)
1715                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1716                        obd->obd_name, nid);
1717         return exports_evicted;
1718 }
1719 EXPORT_SYMBOL(obd_export_evict_by_nid);
1720
1721 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1722 {
1723         struct cfs_hash *uuid_hash;
1724         struct obd_export *doomed_exp = NULL;
1725         struct obd_uuid doomed_uuid;
1726         int exports_evicted = 0;
1727
1728         spin_lock(&obd->obd_dev_lock);
1729         if (obd->obd_stopping) {
1730                 spin_unlock(&obd->obd_dev_lock);
1731                 return exports_evicted;
1732         }
1733         uuid_hash = obd->obd_uuid_hash;
1734         cfs_hash_getref(uuid_hash);
1735         spin_unlock(&obd->obd_dev_lock);
1736
1737         obd_str2uuid(&doomed_uuid, uuid);
1738         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1739                 CERROR("%s: can't evict myself\n", obd->obd_name);
1740                 cfs_hash_putref(uuid_hash);
1741                 return exports_evicted;
1742         }
1743
1744         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1745
1746         if (doomed_exp == NULL) {
1747                 CERROR("%s: can't disconnect %s: no exports found\n",
1748                        obd->obd_name, uuid);
1749         } else {
1750                 CWARN("%s: evicting %s at adminstrative request\n",
1751                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1752                 class_fail_export(doomed_exp);
1753                 class_export_put(doomed_exp);
1754                 exports_evicted++;
1755         }
1756         cfs_hash_putref(uuid_hash);
1757
1758         return exports_evicted;
1759 }
1760
1761 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1762 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1763 EXPORT_SYMBOL(class_export_dump_hook);
1764 #endif
1765
1766 static void print_export_data(struct obd_export *exp, const char *status,
1767                               int locks, int debug_level)
1768 {
1769         struct ptlrpc_reply_state *rs;
1770         struct ptlrpc_reply_state *first_reply = NULL;
1771         int nreplies = 0;
1772
1773         spin_lock(&exp->exp_lock);
1774         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1775                             rs_exp_list) {
1776                 if (nreplies == 0)
1777                         first_reply = rs;
1778                 nreplies++;
1779         }
1780         spin_unlock(&exp->exp_lock);
1781
1782         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1783                "%p %s %llu stale:%d\n",
1784                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1785                obd_export_nid2str(exp),
1786                refcount_read(&exp->exp_handle.h_ref),
1787                atomic_read(&exp->exp_rpc_count),
1788                atomic_read(&exp->exp_cb_count),
1789                atomic_read(&exp->exp_locks_count),
1790                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1791                nreplies, first_reply, nreplies > 3 ? "..." : "",
1792                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1793 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1794         if (locks && class_export_dump_hook != NULL)
1795                 class_export_dump_hook(exp);
1796 #endif
1797 }
1798
1799 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1800 {
1801         struct obd_export *exp;
1802
1803         spin_lock(&obd->obd_dev_lock);
1804         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1805                 print_export_data(exp, "ACTIVE", locks, debug_level);
1806         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1807                 print_export_data(exp, "UNLINKED", locks, debug_level);
1808         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1809                 print_export_data(exp, "DELAYED", locks, debug_level);
1810         spin_unlock(&obd->obd_dev_lock);
1811 }
1812
1813 void obd_exports_barrier(struct obd_device *obd)
1814 {
1815         int waited = 2;
1816         LASSERT(list_empty(&obd->obd_exports));
1817         spin_lock(&obd->obd_dev_lock);
1818         while (!list_empty(&obd->obd_unlinked_exports)) {
1819                 spin_unlock(&obd->obd_dev_lock);
1820                 set_current_state(TASK_UNINTERRUPTIBLE);
1821                 schedule_timeout(cfs_time_seconds(waited));
1822                 if (waited > 5 && is_power_of_2(waited)) {
1823                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1824                                       "more than %d seconds. "
1825                                       "The obd refcount = %d. Is it stuck?\n",
1826                                       obd->obd_name, waited,
1827                                       atomic_read(&obd->obd_refcount));
1828                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1829                 }
1830                 waited *= 2;
1831                 spin_lock(&obd->obd_dev_lock);
1832         }
1833         spin_unlock(&obd->obd_dev_lock);
1834 }
1835 EXPORT_SYMBOL(obd_exports_barrier);
1836
1837 /**
1838  * Add export to the obd_zombe thread and notify it.
1839  */
1840 static void obd_zombie_export_add(struct obd_export *exp) {
1841         atomic_dec(&obd_stale_export_num);
1842         spin_lock(&exp->exp_obd->obd_dev_lock);
1843         LASSERT(!list_empty(&exp->exp_obd_chain));
1844         list_del_init(&exp->exp_obd_chain);
1845         spin_unlock(&exp->exp_obd->obd_dev_lock);
1846
1847         queue_work(zombie_wq, &exp->exp_zombie_work);
1848 }
1849
1850 /**
1851  * Add import to the obd_zombe thread and notify it.
1852  */
1853 static void obd_zombie_import_add(struct obd_import *imp) {
1854         LASSERT(imp->imp_sec == NULL);
1855
1856         queue_work(zombie_wq, &imp->imp_zombie_work);
1857 }
1858
1859 /**
1860  * wait when obd_zombie import/export queues become empty
1861  */
1862 void obd_zombie_barrier(void)
1863 {
1864         flush_workqueue(zombie_wq);
1865 }
1866 EXPORT_SYMBOL(obd_zombie_barrier);
1867
1868
1869 struct obd_export *obd_stale_export_get(void)
1870 {
1871         struct obd_export *exp = NULL;
1872         ENTRY;
1873
1874         spin_lock(&obd_stale_export_lock);
1875         if (!list_empty(&obd_stale_exports)) {
1876                 exp = list_entry(obd_stale_exports.next,
1877                                  struct obd_export, exp_stale_list);
1878                 list_del_init(&exp->exp_stale_list);
1879         }
1880         spin_unlock(&obd_stale_export_lock);
1881
1882         if (exp) {
1883                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1884                        atomic_read(&obd_stale_export_num));
1885         }
1886         RETURN(exp);
1887 }
1888 EXPORT_SYMBOL(obd_stale_export_get);
1889
1890 void obd_stale_export_put(struct obd_export *exp)
1891 {
1892         ENTRY;
1893
1894         LASSERT(list_empty(&exp->exp_stale_list));
1895         if (exp->exp_lock_hash &&
1896             atomic_read(&exp->exp_lock_hash->hs_count)) {
1897                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1898                        atomic_read(&obd_stale_export_num));
1899
1900                 spin_lock_bh(&exp->exp_bl_list_lock);
1901                 spin_lock(&obd_stale_export_lock);
1902                 /* Add to the tail if there is no blocked locks,
1903                  * to the head otherwise. */
1904                 if (list_empty(&exp->exp_bl_list))
1905                         list_add_tail(&exp->exp_stale_list,
1906                                       &obd_stale_exports);
1907                 else
1908                         list_add(&exp->exp_stale_list,
1909                                  &obd_stale_exports);
1910
1911                 spin_unlock(&obd_stale_export_lock);
1912                 spin_unlock_bh(&exp->exp_bl_list_lock);
1913         } else {
1914                 class_export_put(exp);
1915         }
1916         EXIT;
1917 }
1918 EXPORT_SYMBOL(obd_stale_export_put);
1919
1920 /**
1921  * Adjust the position of the export in the stale list,
1922  * i.e. move to the head of the list if is needed.
1923  **/
1924 void obd_stale_export_adjust(struct obd_export *exp)
1925 {
1926         LASSERT(exp != NULL);
1927         spin_lock_bh(&exp->exp_bl_list_lock);
1928         spin_lock(&obd_stale_export_lock);
1929
1930         if (!list_empty(&exp->exp_stale_list) &&
1931             !list_empty(&exp->exp_bl_list))
1932                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1933
1934         spin_unlock(&obd_stale_export_lock);
1935         spin_unlock_bh(&exp->exp_bl_list_lock);
1936 }
1937 EXPORT_SYMBOL(obd_stale_export_adjust);
1938
1939 /**
1940  * start destroy zombie import/export thread
1941  */
1942 int obd_zombie_impexp_init(void)
1943 {
1944         zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1945         if (!zombie_wq)
1946                 return -ENOMEM;
1947
1948         return 0;
1949 }
1950
1951 /**
1952  * stop destroy zombie import/export thread
1953  */
1954 void obd_zombie_impexp_stop(void)
1955 {
1956         destroy_workqueue(zombie_wq);
1957         LASSERT(list_empty(&obd_stale_exports));
1958 }
1959
1960 /***** Kernel-userspace comm helpers *******/
1961
1962 /* Get length of entire message, including header */
1963 int kuc_len(int payload_len)
1964 {
1965         return sizeof(struct kuc_hdr) + payload_len;
1966 }
1967 EXPORT_SYMBOL(kuc_len);
1968
1969 /* Get a pointer to kuc header, given a ptr to the payload
1970  * @param p Pointer to payload area
1971  * @returns Pointer to kuc header
1972  */
1973 struct kuc_hdr * kuc_ptr(void *p)
1974 {
1975         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1976         LASSERT(lh->kuc_magic == KUC_MAGIC);
1977         return lh;
1978 }
1979 EXPORT_SYMBOL(kuc_ptr);
1980
1981 /* Alloc space for a message, and fill in header
1982  * @return Pointer to payload area
1983  */
1984 void *kuc_alloc(int payload_len, int transport, int type)
1985 {
1986         struct kuc_hdr *lh;
1987         int len = kuc_len(payload_len);
1988
1989         OBD_ALLOC(lh, len);
1990         if (lh == NULL)
1991                 return ERR_PTR(-ENOMEM);
1992
1993         lh->kuc_magic = KUC_MAGIC;
1994         lh->kuc_transport = transport;
1995         lh->kuc_msgtype = type;
1996         lh->kuc_msglen = len;
1997
1998         return (void *)(lh + 1);
1999 }
2000 EXPORT_SYMBOL(kuc_alloc);
2001
2002 /* Takes pointer to payload area */
2003 void kuc_free(void *p, int payload_len)
2004 {
2005         struct kuc_hdr *lh = kuc_ptr(p);
2006         OBD_FREE(lh, kuc_len(payload_len));
2007 }
2008 EXPORT_SYMBOL(kuc_free);
2009
2010 struct obd_request_slot_waiter {
2011         struct list_head        orsw_entry;
2012         wait_queue_head_t       orsw_waitq;
2013         bool                    orsw_signaled;
2014 };
2015
2016 static bool obd_request_slot_avail(struct client_obd *cli,
2017                                    struct obd_request_slot_waiter *orsw)
2018 {
2019         bool avail;
2020
2021         spin_lock(&cli->cl_loi_list_lock);
2022         avail = !!list_empty(&orsw->orsw_entry);
2023         spin_unlock(&cli->cl_loi_list_lock);
2024
2025         return avail;
2026 };
2027
2028 /*
2029  * For network flow control, the RPC sponsor needs to acquire a credit
2030  * before sending the RPC. The credits count for a connection is defined
2031  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2032  * the subsequent RPC sponsors need to wait until others released their
2033  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2034  */
2035 int obd_get_request_slot(struct client_obd *cli)
2036 {
2037         struct obd_request_slot_waiter   orsw;
2038         struct l_wait_info               lwi;
2039         int                              rc;
2040
2041         spin_lock(&cli->cl_loi_list_lock);
2042         if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2043                 cli->cl_rpcs_in_flight++;
2044                 spin_unlock(&cli->cl_loi_list_lock);
2045                 return 0;
2046         }
2047
2048         init_waitqueue_head(&orsw.orsw_waitq);
2049         list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2050         orsw.orsw_signaled = false;
2051         spin_unlock(&cli->cl_loi_list_lock);
2052
2053         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2054         rc = l_wait_event(orsw.orsw_waitq,
2055                           obd_request_slot_avail(cli, &orsw) ||
2056                           orsw.orsw_signaled,
2057                           &lwi);
2058
2059         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2060          * freed but other (such as obd_put_request_slot) is using it. */
2061         spin_lock(&cli->cl_loi_list_lock);
2062         if (rc != 0) {
2063                 if (!orsw.orsw_signaled) {
2064                         if (list_empty(&orsw.orsw_entry))
2065                                 cli->cl_rpcs_in_flight--;
2066                         else
2067                                 list_del(&orsw.orsw_entry);
2068                 }
2069         }
2070
2071         if (orsw.orsw_signaled) {
2072                 LASSERT(list_empty(&orsw.orsw_entry));
2073
2074                 rc = -EINTR;
2075         }
2076         spin_unlock(&cli->cl_loi_list_lock);
2077
2078         return rc;
2079 }
2080 EXPORT_SYMBOL(obd_get_request_slot);
2081
2082 void obd_put_request_slot(struct client_obd *cli)
2083 {
2084         struct obd_request_slot_waiter *orsw;
2085
2086         spin_lock(&cli->cl_loi_list_lock);
2087         cli->cl_rpcs_in_flight--;
2088
2089         /* If there is free slot, wakeup the first waiter. */
2090         if (!list_empty(&cli->cl_flight_waiters) &&
2091             likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2092                 orsw = list_entry(cli->cl_flight_waiters.next,
2093                                   struct obd_request_slot_waiter, orsw_entry);
2094                 list_del_init(&orsw->orsw_entry);
2095                 cli->cl_rpcs_in_flight++;
2096                 wake_up(&orsw->orsw_waitq);
2097         }
2098         spin_unlock(&cli->cl_loi_list_lock);
2099 }
2100 EXPORT_SYMBOL(obd_put_request_slot);
2101
2102 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2103 {
2104         return cli->cl_max_rpcs_in_flight;
2105 }
2106 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2107
2108 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2109 {
2110         struct obd_request_slot_waiter *orsw;
2111         __u32                           old;
2112         int                             diff;
2113         int                             i;
2114         const char *type_name;
2115         int                             rc;
2116
2117         if (max > OBD_MAX_RIF_MAX || max < 1)
2118                 return -ERANGE;
2119
2120         type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2121         if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2122                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2123                  * strictly lower that max_rpcs_in_flight */
2124                 if (max < 2) {
2125                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2126                                "because it must be higher than "
2127                                "max_mod_rpcs_in_flight value",
2128                                cli->cl_import->imp_obd->obd_name);
2129                         return -ERANGE;
2130                 }
2131                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2132                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2133                         if (rc != 0)
2134                                 return rc;
2135                 }
2136         }
2137
2138         spin_lock(&cli->cl_loi_list_lock);
2139         old = cli->cl_max_rpcs_in_flight;
2140         cli->cl_max_rpcs_in_flight = max;
2141         client_adjust_max_dirty(cli);
2142
2143         diff = max - old;
2144
2145         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2146         for (i = 0; i < diff; i++) {
2147                 if (list_empty(&cli->cl_flight_waiters))
2148                         break;
2149
2150                 orsw = list_entry(cli->cl_flight_waiters.next,
2151                                   struct obd_request_slot_waiter, orsw_entry);
2152                 list_del_init(&orsw->orsw_entry);
2153                 cli->cl_rpcs_in_flight++;
2154                 wake_up(&orsw->orsw_waitq);
2155         }
2156         spin_unlock(&cli->cl_loi_list_lock);
2157
2158         return 0;
2159 }
2160 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2161
2162 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2163 {
2164         return cli->cl_max_mod_rpcs_in_flight;
2165 }
2166 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2167
2168 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2169 {
2170         struct obd_connect_data *ocd;
2171         __u16 maxmodrpcs;
2172         __u16 prev;
2173
2174         if (max > OBD_MAX_RIF_MAX || max < 1)
2175                 return -ERANGE;
2176
2177         /* cannot exceed or equal max_rpcs_in_flight */
2178         if (max >= cli->cl_max_rpcs_in_flight) {
2179                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2180                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2181                        cli->cl_import->imp_obd->obd_name,
2182                        max, cli->cl_max_rpcs_in_flight);
2183                 return -ERANGE;
2184         }
2185
2186         /* cannot exceed max modify RPCs in flight supported by the server */
2187         ocd = &cli->cl_import->imp_connect_data;
2188         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2189                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2190         else
2191                 maxmodrpcs = 1;
2192         if (max > maxmodrpcs) {
2193                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2194                        "higher than max_mod_rpcs_per_client value (%hu) "
2195                        "returned by the server at connection\n",
2196                        cli->cl_import->imp_obd->obd_name,
2197                        max, maxmodrpcs);
2198                 return -ERANGE;
2199         }
2200
2201         spin_lock(&cli->cl_mod_rpcs_lock);
2202
2203         prev = cli->cl_max_mod_rpcs_in_flight;
2204         cli->cl_max_mod_rpcs_in_flight = max;
2205
2206         /* wakeup waiters if limit has been increased */
2207         if (cli->cl_max_mod_rpcs_in_flight > prev)
2208                 wake_up(&cli->cl_mod_rpcs_waitq);
2209
2210         spin_unlock(&cli->cl_mod_rpcs_lock);
2211
2212         return 0;
2213 }
2214 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2215
2216 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2217                                struct seq_file *seq)
2218 {
2219         unsigned long mod_tot = 0, mod_cum;
2220         struct timespec64 now;
2221         int i;
2222
2223         ktime_get_real_ts64(&now);
2224
2225         spin_lock(&cli->cl_mod_rpcs_lock);
2226
2227         seq_printf(seq, "snapshot_time:         %llu.%9lu (secs.nsecs)\n",
2228                    (s64)now.tv_sec, now.tv_nsec);
2229         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2230                    cli->cl_mod_rpcs_in_flight);
2231
2232         seq_printf(seq, "\n\t\t\tmodify\n");
2233         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2234
2235         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2236
2237         mod_cum = 0;
2238         for (i = 0; i < OBD_HIST_MAX; i++) {
2239                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2240                 mod_cum += mod;
2241                 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2242                            i, mod, pct(mod, mod_tot),
2243                            pct(mod_cum, mod_tot));
2244                 if (mod_cum == mod_tot)
2245                         break;
2246         }
2247
2248         spin_unlock(&cli->cl_mod_rpcs_lock);
2249
2250         return 0;
2251 }
2252 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2253
2254 /* The number of modify RPCs sent in parallel is limited
2255  * because the server has a finite number of slots per client to
2256  * store request result and ensure reply reconstruction when needed.
2257  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2258  * that takes into account server limit and cl_max_rpcs_in_flight
2259  * value.
2260  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2261  * one close request is allowed above the maximum.
2262  */
2263 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2264                                                  bool close_req)
2265 {
2266         bool avail;
2267
2268         /* A slot is available if
2269          * - number of modify RPCs in flight is less than the max
2270          * - it's a close RPC and no other close request is in flight
2271          */
2272         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2273                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2274
2275         return avail;
2276 }
2277
2278 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2279                                          bool close_req)
2280 {
2281         bool avail;
2282
2283         spin_lock(&cli->cl_mod_rpcs_lock);
2284         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2285         spin_unlock(&cli->cl_mod_rpcs_lock);
2286         return avail;
2287 }
2288
2289 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2290 {
2291         if (it != NULL &&
2292             (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2293              it->it_op == IT_READDIR ||
2294              (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2295                         return true;
2296         return false;
2297 }
2298
2299 /* Get a modify RPC slot from the obd client @cli according
2300  * to the kind of operation @opc that is going to be sent
2301  * and the intent @it of the operation if it applies.
2302  * If the maximum number of modify RPCs in flight is reached
2303  * the thread is put to sleep.
2304  * Returns the tag to be set in the request message. Tag 0
2305  * is reserved for non-modifying requests.
2306  */
2307 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2308                            struct lookup_intent *it)
2309 {
2310         bool                    close_req = false;
2311         __u16                   i, max;
2312
2313         /* read-only metadata RPCs don't consume a slot on MDT
2314          * for reply reconstruction
2315          */
2316         if (obd_skip_mod_rpc_slot(it))
2317                 return 0;
2318
2319         if (opc == MDS_CLOSE)
2320                 close_req = true;
2321
2322         do {
2323                 spin_lock(&cli->cl_mod_rpcs_lock);
2324                 max = cli->cl_max_mod_rpcs_in_flight;
2325                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2326                         /* there is a slot available */
2327                         cli->cl_mod_rpcs_in_flight++;
2328                         if (close_req)
2329                                 cli->cl_close_rpcs_in_flight++;
2330                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2331                                          cli->cl_mod_rpcs_in_flight);
2332                         /* find a free tag */
2333                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2334                                                 max + 1);
2335                         LASSERT(i < OBD_MAX_RIF_MAX);
2336                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2337                         spin_unlock(&cli->cl_mod_rpcs_lock);
2338                         /* tag 0 is reserved for non-modify RPCs */
2339
2340                         CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
2341                                "opc %u, max %hu\n",
2342                                cli->cl_import->imp_obd->obd_name,
2343                                i + 1, opc, max);
2344
2345                         return i + 1;
2346                 }
2347                 spin_unlock(&cli->cl_mod_rpcs_lock);
2348
2349                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2350                        "opc %u, max %hu\n",
2351                        cli->cl_import->imp_obd->obd_name, opc, max);
2352
2353                 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2354                                           obd_mod_rpc_slot_avail(cli,
2355                                                                  close_req));
2356         } while (true);
2357 }
2358 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2359
2360 /* Put a modify RPC slot from the obd client @cli according
2361  * to the kind of operation @opc that has been sent and the
2362  * intent @it of the operation if it applies.
2363  */
2364 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2365                           struct lookup_intent *it, __u16 tag)
2366 {
2367         bool                    close_req = false;
2368
2369         if (obd_skip_mod_rpc_slot(it))
2370                 return;
2371
2372         if (opc == MDS_CLOSE)
2373                 close_req = true;
2374
2375         spin_lock(&cli->cl_mod_rpcs_lock);
2376         cli->cl_mod_rpcs_in_flight--;
2377         if (close_req)
2378                 cli->cl_close_rpcs_in_flight--;
2379         /* release the tag in the bitmap */
2380         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2381         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2382         spin_unlock(&cli->cl_mod_rpcs_lock);
2383         wake_up(&cli->cl_mod_rpcs_waitq);
2384 }
2385 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2386