4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/obdclass/genops.c
33 * These are the only exported functions, they provide some generic
34 * infrastructure for managing object devices
37 #define DEBUG_SUBSYSTEM S_CLASS
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
48 DEFINE_XARRAY_ALLOC(obd_devs);
49 EXPORT_SYMBOL(obd_devs);
51 static atomic_t obd_devs_count = ATOMIC_INIT(0);
53 static struct kmem_cache *obd_device_cachep;
54 static struct kobj_type class_ktype;
55 static struct workqueue_struct *zombie_wq;
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60 const char *status, int locks, int debug_level);
62 static LIST_HEAD(obd_stale_exports);
63 static DEFINE_SPINLOCK(obd_stale_export_lock);
64 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
66 static struct obd_device *obd_device_alloc(void)
68 struct obd_device *obd;
70 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
72 obd->obd_magic = OBD_DEVICE_MAGIC;
77 static void obd_device_free(struct obd_device *obd)
80 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
81 "obd %p obd_magic %08x != %08x\n",
82 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
83 if (obd->obd_namespace != NULL) {
84 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
85 obd, obd->obd_namespace, obd->obd_force);
88 lu_ref_fini(&obd->obd_reference);
89 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
92 struct obd_type *class_search_type(const char *name)
94 struct kobject *kobj = kset_find_obj(lustre_kset, name);
96 if (kobj && kobj->ktype == &class_ktype)
97 return container_of(kobj, struct obd_type, typ_kobj);
102 EXPORT_SYMBOL(class_search_type);
104 struct obd_type *class_get_type(const char *name)
106 struct obd_type *type;
108 type = class_search_type(name);
109 #ifdef HAVE_MODULE_LOADING_SUPPORT
111 const char *modname = name;
113 #ifdef HAVE_SERVER_SUPPORT
114 if (strcmp(modname, "obdfilter") == 0)
117 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
118 modname = LUSTRE_OSP_NAME;
120 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
121 modname = LUSTRE_MDT_NAME;
122 #endif /* HAVE_SERVER_SUPPORT */
124 if (!request_module("%s", modname)) {
125 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126 type = class_search_type(name);
128 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
134 if (try_module_get(type->typ_dt_ops->o_owner)) {
135 atomic_inc(&type->typ_refcnt);
136 /* class_search_type() returned a counted reference,
137 * but we don't need that count any more as
138 * we have one through typ_refcnt.
140 kobject_put(&type->typ_kobj);
142 kobject_put(&type->typ_kobj);
148 EXPORT_SYMBOL(class_get_type);
150 void class_put_type(struct obd_type *type)
153 module_put(type->typ_dt_ops->o_owner);
154 atomic_dec(&type->typ_refcnt);
156 EXPORT_SYMBOL(class_put_type);
158 static void class_sysfs_release(struct kobject *kobj)
160 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
162 debugfs_remove_recursive(type->typ_debugfs_entry);
163 type->typ_debugfs_entry = NULL;
166 lu_device_type_fini(type->typ_lu);
168 #ifdef CONFIG_PROC_FS
169 if (type->typ_name && type->typ_procroot)
170 remove_proc_subtree(type->typ_name, proc_lustre_root);
172 OBD_FREE(type, sizeof(*type));
175 static struct kobj_type class_ktype = {
176 .sysfs_ops = &lustre_sysfs_ops,
177 .release = class_sysfs_release,
180 #ifdef HAVE_SERVER_SUPPORT
181 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
183 struct dentry *symlink;
184 struct obd_type *type;
187 type = class_search_type(name);
189 kobject_put(&type->typ_kobj);
190 return ERR_PTR(-EEXIST);
193 OBD_ALLOC(type, sizeof(*type));
195 return ERR_PTR(-ENOMEM);
197 type->typ_kobj.kset = lustre_kset;
198 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
199 &lustre_kset->kobj, "%s", name);
203 symlink = debugfs_create_dir(name, debugfs_lustre_root);
204 type->typ_debugfs_entry = symlink;
205 type->typ_sym_filter = true;
208 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
210 if (IS_ERR(type->typ_procroot)) {
211 CERROR("%s: can't create compat proc entry: %d\n",
212 name, (int)PTR_ERR(type->typ_procroot));
213 type->typ_procroot = NULL;
219 EXPORT_SYMBOL(class_add_symlinks);
220 #endif /* HAVE_SERVER_SUPPORT */
222 #define CLASS_MAX_NAME 1024
224 int class_register_type(const struct obd_ops *dt_ops,
225 const struct md_ops *md_ops,
227 const char *name, struct lu_device_type *ldt)
229 struct obd_type *type;
234 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
236 type = class_search_type(name);
238 #ifdef HAVE_SERVER_SUPPORT
239 if (type->typ_sym_filter)
241 #endif /* HAVE_SERVER_SUPPORT */
242 kobject_put(&type->typ_kobj);
243 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
247 OBD_ALLOC(type, sizeof(*type));
251 type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
252 type->typ_kobj.kset = lustre_kset;
253 kobject_init(&type->typ_kobj, &class_ktype);
254 #ifdef HAVE_SERVER_SUPPORT
256 #endif /* HAVE_SERVER_SUPPORT */
258 type->typ_dt_ops = dt_ops;
259 type->typ_md_ops = md_ops;
261 #ifdef HAVE_SERVER_SUPPORT
262 if (type->typ_sym_filter) {
263 type->typ_sym_filter = false;
264 kobject_put(&type->typ_kobj);
268 #ifdef CONFIG_PROC_FS
269 if (enable_proc && !type->typ_procroot) {
270 type->typ_procroot = lprocfs_register(name,
273 if (IS_ERR(type->typ_procroot)) {
274 rc = PTR_ERR(type->typ_procroot);
275 type->typ_procroot = NULL;
280 type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
282 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
285 #ifdef HAVE_SERVER_SUPPORT
289 rc = lu_device_type_init(ldt);
290 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
291 wake_up_var(&type->typ_lu);
299 kobject_put(&type->typ_kobj);
303 EXPORT_SYMBOL(class_register_type);
305 int class_unregister_type(const char *name)
307 struct obd_type *type = class_search_type(name);
312 CERROR("unknown obd type\n");
316 if (atomic_read(&type->typ_refcnt)) {
317 CERROR("type %s has refcount (%d)\n", name,
318 atomic_read(&type->typ_refcnt));
319 /* This is a bad situation, let's make the best of it */
320 /* Remove ops, but leave the name for debugging */
321 type->typ_dt_ops = NULL;
322 type->typ_md_ops = NULL;
323 GOTO(out_put, rc = -EBUSY);
326 /* Put the final ref */
327 kobject_put(&type->typ_kobj);
329 /* Put the ref returned by class_search_type() */
330 kobject_put(&type->typ_kobj);
333 } /* class_unregister_type */
334 EXPORT_SYMBOL(class_unregister_type);
337 * Create a new obd device.
339 * Allocate the new obd_device and initialize it.
341 * \param[in] type_name obd device type string.
342 * \param[in] name obd device name.
343 * \param[in] uuid obd device UUID
345 * \retval newdev pointer to created obd_device
346 * \retval ERR_PTR(errno) on error
348 struct obd_device *class_newdev(const char *type_name, const char *name,
351 struct obd_device *newdev;
352 struct obd_type *type = NULL;
355 if (strlen(name) >= MAX_OBD_NAME) {
356 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
357 RETURN(ERR_PTR(-EINVAL));
360 type = class_get_type(type_name);
362 CERROR("OBD: unknown type: %s\n", type_name);
363 RETURN(ERR_PTR(-ENODEV));
366 newdev = obd_device_alloc();
367 if (newdev == NULL) {
368 class_put_type(type);
369 RETURN(ERR_PTR(-ENOMEM));
371 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
372 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
373 newdev->obd_type = type;
374 newdev->obd_minor = -1;
376 rwlock_init(&newdev->obd_pool_lock);
377 newdev->obd_pool_limit = 0;
378 newdev->obd_pool_slv = 0;
380 INIT_LIST_HEAD(&newdev->obd_exports);
381 newdev->obd_num_exports = 0;
382 newdev->obd_grant_check_threshold = 100;
383 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
384 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
385 INIT_LIST_HEAD(&newdev->obd_exports_timed);
386 INIT_LIST_HEAD(&newdev->obd_nid_stats);
387 spin_lock_init(&newdev->obd_nid_lock);
388 spin_lock_init(&newdev->obd_dev_lock);
389 mutex_init(&newdev->obd_dev_mutex);
390 spin_lock_init(&newdev->obd_osfs_lock);
391 /* newdev->obd_osfs_age must be set to a value in the distant
392 * past to guarantee a fresh statfs is fetched on mount. */
393 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
395 /* XXX belongs in setup not attach */
396 init_rwsem(&newdev->obd_observer_link_sem);
398 spin_lock_init(&newdev->obd_recovery_task_lock);
399 init_waitqueue_head(&newdev->obd_next_transno_waitq);
400 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
401 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
402 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
403 INIT_LIST_HEAD(&newdev->obd_evict_list);
404 INIT_LIST_HEAD(&newdev->obd_lwp_list);
406 llog_group_init(&newdev->obd_olg);
407 /* Detach drops this */
408 kref_init(&newdev->obd_refcount);
409 lu_ref_init(&newdev->obd_reference);
410 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
412 newdev->obd_conn_inprogress = 0;
414 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
416 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
417 newdev->obd_name, newdev);
425 * \param[in] obd obd_device to be freed
429 void class_free_dev(struct obd_device *obd)
431 struct obd_type *obd_type = obd->obd_type;
433 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
434 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
435 LASSERTF(obd->obd_minor == -1 || class_num2obd(obd->obd_minor) == obd,
436 "obd %p != obd_devs[%d] %p\n",
437 obd, obd->obd_minor, class_num2obd(obd->obd_minor));
438 LASSERTF(kref_read(&obd->obd_refcount) == 0,
439 "obd_refcount should be 0, not %d\n",
440 kref_read(&obd->obd_refcount));
441 LASSERT(obd_type != NULL);
443 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
444 obd->obd_name, obd->obd_type->typ_name);
446 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
447 obd->obd_name, obd->obd_uuid.uuid);
448 if (obd->obd_stopping) {
451 /* If we're not stopping, we were never set up */
452 err = obd_cleanup(obd);
454 CERROR("Cleanup %s returned %d\n",
458 obd_device_free(obd);
460 class_put_type(obd_type);
464 * Unregister obd device.
466 * Remove an obd from obd_dev
468 * \param[in] new_obd obd_device to be unregistered
472 void class_unregister_device(struct obd_device *obd)
474 if (obd->obd_minor >= 0) {
475 xa_erase(&obd_devs, obd->obd_minor);
476 class_decref(obd, "obd_device_list", obd);
478 atomic_dec(&obd_devs_count);
483 * Register obd device.
485 * Add new_obd to obd_devs
487 * \param[in] new_obd obd_device to be registered
490 * \retval -EEXIST device with this name is registered
492 int class_register_device(struct obd_device *new_obd)
497 if (new_obd == NULL) {
503 * The obd_device could be waiting to be
504 * destroyed by "obd_zombie_impexp_thread"
506 if (class_name2dev(new_obd->obd_name) != -1)
507 obd_zombie_barrier();
509 if (class_name2dev(new_obd->obd_name) == -1) {
510 class_incref(new_obd, "obd_device_list", new_obd);
511 rc = xa_alloc(&obd_devs, &dev_no, new_obd,
512 xa_limit_31b, GFP_ATOMIC);
517 new_obd->obd_minor = dev_no;
518 atomic_inc(&obd_devs_count);
527 int class_name2dev(const char *name)
529 struct obd_device *obd = NULL;
530 unsigned long dev_no = 0;
537 obd_device_for_each(dev_no, obd) {
538 if (strcmp(name, obd->obd_name) == 0) {
540 * Make sure we finished attaching before we give
543 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
544 if (obd->obd_attached) {
545 ret = obd->obd_minor;
556 EXPORT_SYMBOL(class_name2dev);
558 struct obd_device *class_name2obd(const char *name)
560 struct obd_device *obd = NULL;
561 unsigned long dev_no = 0;
567 obd_device_for_each(dev_no, obd) {
568 if (strcmp(name, obd->obd_name) == 0) {
570 * Make sure we finished attaching before we give
573 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
574 if (obd->obd_attached)
581 * TODO: We give out a reference without class_incref(). This isn't
582 * ideal, but this behavior is identical in previous implementations
587 EXPORT_SYMBOL(class_name2obd);
589 int class_uuid2dev(struct obd_uuid *uuid)
591 struct obd_device *obd = NULL;
592 unsigned long dev_no = 0;
596 obd_device_for_each(dev_no, obd) {
597 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
598 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
599 ret = obd->obd_minor;
608 EXPORT_SYMBOL(class_uuid2dev);
610 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
612 struct obd_device *obd = NULL;
613 unsigned long dev_no = 0;
616 obd_device_for_each(dev_no, obd) {
617 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
618 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
625 * TODO: We give out a reference without class_incref(). This isn't
626 * ideal, but this behavior is identical in previous implementations
631 EXPORT_SYMBOL(class_uuid2obd);
633 struct obd_device *class_num2obd(int dev_no)
635 return xa_load(&obd_devs, dev_no);
637 EXPORT_SYMBOL(class_num2obd);
640 * Find obd by name or uuid.
642 * Increment obd's refcount if found.
644 * \param[in] str obd name or uuid
646 * \retval NULL if not found
647 * \retval obd pointer to found obd_device
649 struct obd_device *class_str2obd(const char *str)
651 struct obd_device *obd = NULL;
652 struct obd_uuid uuid;
653 unsigned long dev_no = 0;
655 obd_str2uuid(&uuid, str);
658 obd_device_for_each(dev_no, obd) {
659 if (obd_uuid_equals(&uuid, &obd->obd_uuid) ||
660 (strcmp(str, obd->obd_name) == 0)) {
662 * Make sure we finished attaching before we give
665 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
666 if (obd->obd_attached) {
667 class_incref(obd, "find", current);
677 EXPORT_SYMBOL(class_str2obd);
680 * Get obd devices count. Device in any
682 * \retval obd device count
684 int class_obd_devs_count(void)
686 return atomic_read(&obd_devs_count);
688 EXPORT_SYMBOL(class_obd_devs_count);
690 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
691 * specified, then only the client with that uuid is returned,
692 * otherwise any client connected to the tgt is returned.
694 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
695 const char *type_name,
696 struct obd_uuid *grp_uuid)
698 struct obd_device *obd = NULL;
699 unsigned long dev_no = 0;
702 obd_device_for_each(dev_no, obd) {
703 if ((strncmp(obd->obd_type->typ_name, type_name,
704 strlen(type_name)) == 0)) {
705 if (obd_uuid_equals(tgt_uuid,
706 &obd->u.cli.cl_target_uuid) &&
707 ((grp_uuid) ? obd_uuid_equals(grp_uuid,
708 &obd->obd_uuid) : 1)) {
718 EXPORT_SYMBOL(class_find_client_obd);
721 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
722 * adjust sptlrpc settings accordingly.
724 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
726 struct obd_device *obd = NULL;
727 unsigned long dev_no = 0;
731 LASSERT(namelen > 0);
734 obd_device_for_each(dev_no, obd) {
735 if (obd->obd_set_up == 0 || obd->obd_stopping)
738 /* only notify mdc, osc, osp, lwp, mdt, ost
739 * because only these have a -sptlrpc llog */
740 type = obd->obd_type->typ_name;
741 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
742 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
743 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
744 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
745 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
746 strcmp(type, LUSTRE_OST_NAME) != 0)
749 if (strncmp(obd->obd_name, fsname, namelen))
752 class_incref(obd, __func__, obd);
754 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
755 sizeof(KEY_SPTLRPC_CONF),
756 KEY_SPTLRPC_CONF, 0, NULL, NULL);
759 class_decref(obd, __func__, obd);
765 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
767 void obd_cleanup_caches(void)
770 if (obd_device_cachep) {
771 kmem_cache_destroy(obd_device_cachep);
772 obd_device_cachep = NULL;
778 int obd_init_caches(void)
783 LASSERT(obd_device_cachep == NULL);
784 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
785 sizeof(struct obd_device),
786 0, 0, 0, sizeof(struct obd_device), NULL);
787 if (!obd_device_cachep)
788 GOTO(out, rc = -ENOMEM);
792 obd_cleanup_caches();
796 static const char export_handle_owner[] = "export";
798 /* map connection to client */
799 struct obd_export *class_conn2export(struct lustre_handle *conn)
801 struct obd_export *export;
805 CDEBUG(D_CACHE, "looking for null handle\n");
809 if (conn->cookie == -1) { /* this means assign a new connection */
810 CDEBUG(D_CACHE, "want a new connection\n");
814 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
815 export = class_handle2object(conn->cookie, export_handle_owner);
818 EXPORT_SYMBOL(class_conn2export);
820 struct obd_device *class_exp2obd(struct obd_export *exp)
826 EXPORT_SYMBOL(class_exp2obd);
828 struct obd_import *class_exp2cliimp(struct obd_export *exp)
830 struct obd_device *obd = exp->exp_obd;
833 return obd->u.cli.cl_import;
835 EXPORT_SYMBOL(class_exp2cliimp);
837 /* Export management functions */
838 static void class_export_destroy(struct obd_export *exp)
840 struct obd_device *obd = exp->exp_obd;
843 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
844 LASSERT(obd != NULL);
846 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
847 exp->exp_client_uuid.uuid, obd->obd_name);
849 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
850 ptlrpc_connection_put(exp->exp_connection);
852 LASSERT(list_empty(&exp->exp_outstanding_replies));
853 LASSERT(list_empty(&exp->exp_uncommitted_replies));
854 LASSERT(list_empty(&exp->exp_req_replay_queue));
855 LASSERT(list_empty(&exp->exp_hp_rpcs));
856 obd_destroy_export(exp);
857 /* self export doesn't hold a reference to an obd, although it
858 * exists until freeing of the obd */
859 if (exp != obd->obd_self_export)
860 class_decref(obd, "export", exp);
862 OBD_FREE_PRE(exp, sizeof(*exp), "kfree_rcu");
863 kfree_rcu(exp, exp_handle.h_rcu);
867 struct obd_export *class_export_get(struct obd_export *exp)
869 refcount_inc(&exp->exp_handle.h_ref);
870 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
871 refcount_read(&exp->exp_handle.h_ref));
874 EXPORT_SYMBOL(class_export_get);
876 void class_export_put(struct obd_export *exp)
878 LASSERT(exp != NULL);
879 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
880 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
881 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
882 refcount_read(&exp->exp_handle.h_ref) - 1);
884 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
885 struct obd_device *obd = exp->exp_obd;
887 CDEBUG(D_IOCTL, "final put %p/%s\n",
888 exp, exp->exp_client_uuid.uuid);
890 /* release nid stat refererence */
891 lprocfs_exp_cleanup(exp);
893 if (exp == obd->obd_self_export) {
894 /* self export should be destroyed without
895 * zombie thread as it doesn't hold a
896 * reference to obd and doesn't hold any
898 class_export_destroy(exp);
899 /* self export is destroyed, no class
900 * references exist and it is safe to free
904 LASSERT(!list_empty(&exp->exp_obd_chain));
905 obd_zombie_export_add(exp);
910 EXPORT_SYMBOL(class_export_put);
912 static void obd_zombie_exp_cull(struct work_struct *ws)
914 struct obd_export *export;
916 export = container_of(ws, struct obd_export, exp_zombie_work);
917 class_export_destroy(export);
918 LASSERT(atomic_read(&obd_stale_export_num) > 0);
919 if (atomic_dec_and_test(&obd_stale_export_num))
920 wake_up_var(&obd_stale_export_num);
923 /* Creates a new export, adds it to the hash table, and returns a
924 * pointer to it. The refcount is 2: one for the hash reference, and
925 * one for the pointer returned by this function. */
926 static struct obd_export *__class_new_export(struct obd_device *obd,
927 struct obd_uuid *cluuid,
930 struct obd_export *export;
934 OBD_ALLOC_PTR(export);
936 return ERR_PTR(-ENOMEM);
938 export->exp_conn_cnt = 0;
939 export->exp_lock_hash = NULL;
940 export->exp_flock_hash = NULL;
941 /* 2 = class_handle_hash + last */
942 refcount_set(&export->exp_handle.h_ref, 2);
943 atomic_set(&export->exp_rpc_count, 0);
944 atomic_set(&export->exp_cb_count, 0);
945 atomic_set(&export->exp_locks_count, 0);
946 #if LUSTRE_TRACKS_LOCK_EXP_REFS
947 INIT_LIST_HEAD(&export->exp_locks_list);
948 spin_lock_init(&export->exp_locks_list_guard);
950 atomic_set(&export->exp_replay_count, 0);
951 export->exp_obd = obd;
952 INIT_LIST_HEAD(&export->exp_outstanding_replies);
953 spin_lock_init(&export->exp_uncommitted_replies_lock);
954 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
955 INIT_LIST_HEAD(&export->exp_req_replay_queue);
956 INIT_HLIST_NODE(&export->exp_handle.h_link);
957 INIT_LIST_HEAD(&export->exp_hp_rpcs);
958 INIT_LIST_HEAD(&export->exp_reg_rpcs);
959 class_handle_hash(&export->exp_handle, export_handle_owner);
960 export->exp_last_request_time = ktime_get_real_seconds();
961 spin_lock_init(&export->exp_lock);
962 spin_lock_init(&export->exp_rpc_lock);
963 INIT_HLIST_NODE(&export->exp_gen_hash);
964 spin_lock_init(&export->exp_bl_list_lock);
965 INIT_LIST_HEAD(&export->exp_bl_list);
966 INIT_LIST_HEAD(&export->exp_stale_list);
967 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
969 export->exp_sp_peer = LUSTRE_SP_ANY;
970 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
971 export->exp_client_uuid = *cluuid;
972 obd_init_export(export);
974 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
975 export->exp_root_fid.f_seq = 0;
976 export->exp_root_fid.f_oid = 0;
977 export->exp_root_fid.f_ver = 0;
979 spin_lock(&obd->obd_dev_lock);
980 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
981 /* shouldn't happen, but might race */
982 if (obd->obd_stopping)
983 GOTO(exit_unlock, rc = -ENODEV);
985 rc = obd_uuid_add(obd, export);
987 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
988 obd->obd_name, cluuid->uuid, rc);
989 GOTO(exit_unlock, rc = -EALREADY);
994 class_incref(obd, "export", export);
995 list_add_tail(&export->exp_obd_chain_timed,
996 &obd->obd_exports_timed);
997 list_add(&export->exp_obd_chain, &obd->obd_exports);
998 obd->obd_num_exports++;
1000 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1001 INIT_LIST_HEAD(&export->exp_obd_chain);
1003 spin_unlock(&obd->obd_dev_lock);
1007 spin_unlock(&obd->obd_dev_lock);
1008 class_handle_unhash(&export->exp_handle);
1009 obd_destroy_export(export);
1010 OBD_FREE_PTR(export);
1014 struct obd_export *class_new_export(struct obd_device *obd,
1015 struct obd_uuid *uuid)
1017 return __class_new_export(obd, uuid, false);
1019 EXPORT_SYMBOL(class_new_export);
1021 struct obd_export *class_new_export_self(struct obd_device *obd,
1022 struct obd_uuid *uuid)
1024 return __class_new_export(obd, uuid, true);
1027 void class_unlink_export(struct obd_export *exp)
1029 class_handle_unhash(&exp->exp_handle);
1031 if (exp->exp_obd->obd_self_export == exp) {
1032 class_export_put(exp);
1036 spin_lock(&exp->exp_obd->obd_dev_lock);
1037 /* delete an uuid-export hashitem from hashtables */
1038 if (exp != exp->exp_obd->obd_self_export)
1039 obd_uuid_del(exp->exp_obd, exp);
1041 #ifdef HAVE_SERVER_SUPPORT
1042 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1043 struct tg_export_data *ted = &exp->exp_target_data;
1044 struct cfs_hash *hash;
1046 /* Because obd_gen_hash will not be released until
1047 * class_cleanup(), so hash should never be NULL here */
1048 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1049 LASSERT(hash != NULL);
1050 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1051 &exp->exp_gen_hash);
1052 cfs_hash_putref(hash);
1054 #endif /* HAVE_SERVER_SUPPORT */
1056 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1057 list_del_init(&exp->exp_obd_chain_timed);
1058 exp->exp_obd->obd_num_exports--;
1059 spin_unlock(&exp->exp_obd->obd_dev_lock);
1061 /* A reference is kept by obd_stale_exports list */
1062 obd_stale_export_put(exp);
1064 EXPORT_SYMBOL(class_unlink_export);
1066 /* Import management functions */
1067 static void obd_zombie_import_free(struct obd_import *imp)
1069 struct obd_import_conn *imp_conn;
1072 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1073 imp->imp_obd->obd_name);
1075 LASSERT(refcount_read(&imp->imp_refcount) == 0);
1077 ptlrpc_connection_put(imp->imp_connection);
1079 while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1080 struct obd_import_conn,
1081 oic_item)) != NULL) {
1082 list_del_init(&imp_conn->oic_item);
1083 ptlrpc_connection_put(imp_conn->oic_conn);
1084 OBD_FREE(imp_conn, sizeof(*imp_conn));
1087 LASSERT(imp->imp_sec == NULL);
1088 LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1089 imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1090 class_decref(imp->imp_obd, "import", imp);
1095 struct obd_import *class_import_get(struct obd_import *import)
1097 refcount_inc(&import->imp_refcount);
1098 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1099 refcount_read(&import->imp_refcount),
1100 import->imp_obd->obd_name);
1103 EXPORT_SYMBOL(class_import_get);
1105 void class_import_put(struct obd_import *imp)
1109 LASSERT(refcount_read(&imp->imp_refcount) > 0);
1111 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1112 refcount_read(&imp->imp_refcount) - 1,
1113 imp->imp_obd->obd_name);
1115 if (refcount_dec_and_test(&imp->imp_refcount)) {
1116 CDEBUG(D_INFO, "final put import %p\n", imp);
1117 obd_zombie_import_add(imp);
1122 EXPORT_SYMBOL(class_import_put);
1124 static void init_imp_at(struct imp_at *at) {
1126 at_init(&at->iat_net_latency, 0, 0);
1127 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1128 /* max service estimates are tracked on the server side, so
1129 don't use the AT history here, just use the last reported
1130 val. (But keep hist for proc histogram, worst_ever) */
1131 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1136 static void obd_zombie_imp_cull(struct work_struct *ws)
1138 struct obd_import *import;
1140 import = container_of(ws, struct obd_import, imp_zombie_work);
1141 obd_zombie_import_free(import);
1144 struct obd_import *class_new_import(struct obd_device *obd)
1146 struct obd_import *imp;
1147 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1149 OBD_ALLOC(imp, sizeof(*imp));
1153 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1154 INIT_LIST_HEAD(&imp->imp_replay_list);
1155 INIT_LIST_HEAD(&imp->imp_sending_list);
1156 INIT_LIST_HEAD(&imp->imp_delayed_list);
1157 INIT_LIST_HEAD(&imp->imp_committed_list);
1158 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1159 imp->imp_known_replied_xid = 0;
1160 imp->imp_replay_cursor = &imp->imp_committed_list;
1161 spin_lock_init(&imp->imp_lock);
1162 imp->imp_last_success_conn = 0;
1163 imp->imp_state = LUSTRE_IMP_NEW;
1164 imp->imp_obd = class_incref(obd, "import", imp);
1165 rwlock_init(&imp->imp_sec_lock);
1166 init_waitqueue_head(&imp->imp_recovery_waitq);
1167 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1169 if (curr_pid_ns && curr_pid_ns->child_reaper)
1170 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1172 imp->imp_sec_refpid = 1;
1174 refcount_set(&imp->imp_refcount, 2);
1175 atomic_set(&imp->imp_unregistering, 0);
1176 atomic_set(&imp->imp_reqs, 0);
1177 atomic_set(&imp->imp_inflight, 0);
1178 atomic_set(&imp->imp_replay_inflight, 0);
1179 init_waitqueue_head(&imp->imp_replay_waitq);
1180 atomic_set(&imp->imp_inval_count, 0);
1181 atomic_set(&imp->imp_waiting, 0);
1182 INIT_LIST_HEAD(&imp->imp_conn_list);
1183 init_imp_at(&imp->imp_at);
1185 /* the default magic is V2, will be used in connect RPC, and
1186 * then adjusted according to the flags in request/reply. */
1187 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1191 EXPORT_SYMBOL(class_new_import);
1193 void class_destroy_import(struct obd_import *import)
1195 LASSERT(import != NULL);
1196 LASSERT(import != LP_POISON);
1198 spin_lock(&import->imp_lock);
1199 import->imp_generation++;
1200 spin_unlock(&import->imp_lock);
1201 class_import_put(import);
1203 EXPORT_SYMBOL(class_destroy_import);
1205 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1207 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1209 spin_lock(&exp->exp_locks_list_guard);
1211 LASSERT(lock->l_exp_refs_nr >= 0);
1213 if (lock->l_exp_refs_target != NULL &&
1214 lock->l_exp_refs_target != exp) {
1215 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1216 exp, lock, lock->l_exp_refs_target);
1218 if ((lock->l_exp_refs_nr ++) == 0) {
1219 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1220 lock->l_exp_refs_target = exp;
1222 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1223 lock, exp, lock->l_exp_refs_nr);
1224 spin_unlock(&exp->exp_locks_list_guard);
1226 EXPORT_SYMBOL(__class_export_add_lock_ref);
1228 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1230 spin_lock(&exp->exp_locks_list_guard);
1231 LASSERT(lock->l_exp_refs_nr > 0);
1232 if (lock->l_exp_refs_target != exp) {
1233 LCONSOLE_WARN("lock %p, "
1234 "mismatching export pointers: %p, %p\n",
1235 lock, lock->l_exp_refs_target, exp);
1237 if (-- lock->l_exp_refs_nr == 0) {
1238 list_del_init(&lock->l_exp_refs_link);
1239 lock->l_exp_refs_target = NULL;
1241 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1242 lock, exp, lock->l_exp_refs_nr);
1243 spin_unlock(&exp->exp_locks_list_guard);
1245 EXPORT_SYMBOL(__class_export_del_lock_ref);
1248 /* A connection defines an export context in which preallocation can
1249 be managed. This releases the export pointer reference, and returns
1250 the export handle, so the export refcount is 1 when this function
1252 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1253 struct obd_uuid *cluuid)
1255 struct obd_export *export;
1256 LASSERT(conn != NULL);
1257 LASSERT(obd != NULL);
1258 LASSERT(cluuid != NULL);
1261 export = class_new_export(obd, cluuid);
1263 RETURN(PTR_ERR(export));
1265 conn->cookie = export->exp_handle.h_cookie;
1266 class_export_put(export);
1268 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1269 cluuid->uuid, conn->cookie);
1272 EXPORT_SYMBOL(class_connect);
1274 /* if export is involved in recovery then clean up related things */
1275 static void class_export_recovery_cleanup(struct obd_export *exp)
1277 struct obd_device *obd = exp->exp_obd;
1279 spin_lock(&obd->obd_recovery_task_lock);
1280 if (obd->obd_recovering) {
1281 if (exp->exp_in_recovery) {
1282 spin_lock(&exp->exp_lock);
1283 exp->exp_in_recovery = 0;
1284 spin_unlock(&exp->exp_lock);
1285 LASSERT(atomic_read(&(obd)->obd_connected_clients) > 0);
1286 atomic_dec(&obd->obd_connected_clients);
1289 /* if called during recovery then should update
1290 * obd_stale_clients counter,
1291 * lightweight exports are not counted */
1292 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1293 exp->exp_obd->obd_stale_clients++;
1295 spin_unlock(&obd->obd_recovery_task_lock);
1297 spin_lock(&exp->exp_lock);
1298 /** Cleanup req replay fields */
1299 if (exp->exp_req_replay_needed) {
1300 exp->exp_req_replay_needed = 0;
1302 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1303 atomic_dec(&obd->obd_req_replay_clients);
1306 /** Cleanup lock replay data */
1307 if (exp->exp_lock_replay_needed) {
1308 exp->exp_lock_replay_needed = 0;
1310 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1311 atomic_dec(&obd->obd_lock_replay_clients);
1313 spin_unlock(&exp->exp_lock);
1316 /* This function removes 1-3 references from the export:
1317 * 1 - for export pointer passed
1318 * and if disconnect really need
1319 * 2 - removing from hash
1320 * 3 - in client_unlink_export
1321 * The export pointer passed to this function can destroyed */
1322 int class_disconnect(struct obd_export *export)
1324 int already_disconnected;
1327 if (export == NULL) {
1328 CWARN("attempting to free NULL export %p\n", export);
1332 spin_lock(&export->exp_lock);
1333 already_disconnected = export->exp_disconnected;
1334 export->exp_disconnected = 1;
1335 #ifdef HAVE_SERVER_SUPPORT
1336 /* We hold references of export for uuid hash
1337 * and nid_hash and export link at least. So
1338 * it is safe to call rh*table_remove_fast in
1341 obd_nid_del(export->exp_obd, export);
1342 #endif /* HAVE_SERVER_SUPPORT */
1343 spin_unlock(&export->exp_lock);
1345 /* class_cleanup(), abort_recovery(), and class_fail_export()
1346 * all end up in here, and if any of them race we shouldn't
1347 * call extra class_export_puts(). */
1348 if (already_disconnected)
1349 GOTO(no_disconn, already_disconnected);
1351 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1352 export->exp_handle.h_cookie);
1354 class_export_recovery_cleanup(export);
1355 class_unlink_export(export);
1357 class_export_put(export);
1360 EXPORT_SYMBOL(class_disconnect);
1362 /* Return non-zero for a fully connected export */
1363 int class_connected_export(struct obd_export *exp)
1368 spin_lock(&exp->exp_lock);
1369 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1370 spin_unlock(&exp->exp_lock);
1374 EXPORT_SYMBOL(class_connected_export);
1376 static void class_disconnect_export_list(struct list_head *list,
1377 enum obd_option flags)
1380 struct obd_export *exp;
1383 /* It's possible that an export may disconnect itself, but
1384 * nothing else will be added to this list.
1386 while ((exp = list_first_entry_or_null(list, struct obd_export,
1387 exp_obd_chain)) != NULL) {
1388 /* need for safe call CDEBUG after obd_disconnect */
1389 class_export_get(exp);
1391 spin_lock(&exp->exp_lock);
1392 exp->exp_flags = flags;
1393 spin_unlock(&exp->exp_lock);
1395 if (obd_uuid_equals(&exp->exp_client_uuid,
1396 &exp->exp_obd->obd_uuid)) {
1398 "exp %p export uuid == obd uuid, don't discon\n",
1400 /* Need to delete this now so we don't end up pointing
1401 * to work_list later when this export is cleaned up. */
1402 list_del_init(&exp->exp_obd_chain);
1403 class_export_put(exp);
1407 class_export_get(exp);
1408 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1409 "last request at %lld\n",
1410 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1411 exp, exp->exp_last_request_time);
1412 /* release one export reference anyway */
1413 rc = obd_disconnect(exp);
1415 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1416 obd_export_nid2str(exp), exp, rc);
1417 class_export_put(exp);
1422 void class_disconnect_exports(struct obd_device *obd)
1424 LIST_HEAD(work_list);
1427 /* Move all of the exports from obd_exports to a work list, en masse. */
1428 spin_lock(&obd->obd_dev_lock);
1429 list_splice_init(&obd->obd_exports, &work_list);
1430 list_splice_init(&obd->obd_delayed_exports, &work_list);
1431 spin_unlock(&obd->obd_dev_lock);
1433 if (!list_empty(&work_list)) {
1434 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1435 "disconnecting them\n", obd->obd_minor, obd);
1436 class_disconnect_export_list(&work_list,
1437 exp_flags_from_obd(obd));
1439 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1440 obd->obd_minor, obd);
1443 EXPORT_SYMBOL(class_disconnect_exports);
1445 /* Remove exports that have not completed recovery.
1447 void class_disconnect_stale_exports(struct obd_device *obd,
1448 int (*test_export)(struct obd_export *))
1450 LIST_HEAD(work_list);
1451 struct obd_export *exp, *n;
1455 spin_lock(&obd->obd_dev_lock);
1456 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1458 /* don't count self-export as client */
1459 if (obd_uuid_equals(&exp->exp_client_uuid,
1460 &exp->exp_obd->obd_uuid))
1463 /* don't evict clients which have no slot in last_rcvd
1464 * (e.g. lightweight connection) */
1465 if (exp->exp_target_data.ted_lr_idx == -1)
1468 spin_lock(&exp->exp_lock);
1469 if (exp->exp_failed || test_export(exp)) {
1470 spin_unlock(&exp->exp_lock);
1473 exp->exp_failed = 1;
1474 atomic_inc(&exp->exp_obd->obd_eviction_count);
1475 spin_unlock(&exp->exp_lock);
1477 list_move(&exp->exp_obd_chain, &work_list);
1479 CWARN("%s: disconnect stale client %s@%s\n",
1480 obd->obd_name, exp->exp_client_uuid.uuid,
1481 obd_export_nid2str(exp));
1482 print_export_data(exp, "EVICTING", 0, D_HA);
1484 spin_unlock(&obd->obd_dev_lock);
1487 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1488 obd->obd_name, evicted);
1490 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1491 OBD_OPT_ABORT_RECOV);
1494 EXPORT_SYMBOL(class_disconnect_stale_exports);
1496 void class_fail_export(struct obd_export *exp)
1498 int rc, already_failed;
1500 spin_lock(&exp->exp_lock);
1501 already_failed = exp->exp_failed;
1502 exp->exp_failed = 1;
1503 spin_unlock(&exp->exp_lock);
1505 if (already_failed) {
1506 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1507 exp, exp->exp_client_uuid.uuid);
1511 atomic_inc(&exp->exp_obd->obd_eviction_count);
1513 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1514 exp, exp->exp_client_uuid.uuid);
1516 if (obd_dump_on_timeout)
1517 libcfs_debug_dumplog();
1519 /* need for safe call CDEBUG after obd_disconnect */
1520 class_export_get(exp);
1522 /* Most callers into obd_disconnect are removing their own reference
1523 * (request, for example) in addition to the one from the hash table.
1524 * We don't have such a reference here, so make one. */
1525 class_export_get(exp);
1526 rc = obd_disconnect(exp);
1528 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1530 CDEBUG(D_HA, "disconnected export %p/%s\n",
1531 exp, exp->exp_client_uuid.uuid);
1532 class_export_put(exp);
1534 EXPORT_SYMBOL(class_fail_export);
1536 #ifdef HAVE_SERVER_SUPPORT
1538 static int take_first(struct obd_export *exp, void *data)
1540 struct obd_export **expp = data;
1543 /* already have one */
1545 if (exp->exp_failed)
1546 /* Don't want this one */
1548 if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1549 /* Cannot get a ref on this one */
1555 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1557 struct lnet_nid nid_key;
1558 struct obd_export *doomed_exp;
1559 int exports_evicted = 0;
1561 libcfs_strnid(&nid_key, nid);
1563 spin_lock(&obd->obd_dev_lock);
1564 /* umount has run already, so evict thread should leave
1565 * its task to umount thread now */
1566 if (obd->obd_stopping) {
1567 spin_unlock(&obd->obd_dev_lock);
1568 return exports_evicted;
1570 spin_unlock(&obd->obd_dev_lock);
1573 while (obd_nid_export_for_each(obd, &nid_key,
1574 take_first, &doomed_exp) > 0) {
1576 LASSERTF(doomed_exp != obd->obd_self_export,
1577 "self-export is hashed by NID?\n");
1579 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1581 obd_uuid2str(&doomed_exp->exp_client_uuid),
1582 obd_export_nid2str(doomed_exp));
1584 class_fail_export(doomed_exp);
1585 class_export_put(doomed_exp);
1590 if (!exports_evicted)
1592 "%s: can't disconnect NID '%s': no exports found\n",
1593 obd->obd_name, nid);
1594 return exports_evicted;
1596 EXPORT_SYMBOL(obd_export_evict_by_nid);
1598 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1600 struct obd_export *doomed_exp = NULL;
1601 struct obd_uuid doomed_uuid;
1602 int exports_evicted = 0;
1604 spin_lock(&obd->obd_dev_lock);
1605 if (obd->obd_stopping) {
1606 spin_unlock(&obd->obd_dev_lock);
1607 return exports_evicted;
1609 spin_unlock(&obd->obd_dev_lock);
1611 obd_str2uuid(&doomed_uuid, uuid);
1612 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1613 CERROR("%s: can't evict myself\n", obd->obd_name);
1614 return exports_evicted;
1617 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1618 if (doomed_exp == NULL) {
1619 CERROR("%s: can't disconnect %s: no exports found\n",
1620 obd->obd_name, uuid);
1622 CWARN("%s: evicting %s at adminstrative request\n",
1623 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1624 class_fail_export(doomed_exp);
1625 class_export_put(doomed_exp);
1626 obd_uuid_del(obd, doomed_exp);
1630 return exports_evicted;
1632 #endif /* HAVE_SERVER_SUPPORT */
1634 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1635 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1636 EXPORT_SYMBOL(class_export_dump_hook);
1639 static void print_export_data(struct obd_export *exp, const char *status,
1640 int locks, int debug_level)
1642 struct ptlrpc_reply_state *rs;
1643 struct ptlrpc_reply_state *first_reply = NULL;
1646 spin_lock(&exp->exp_lock);
1647 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1653 spin_unlock(&exp->exp_lock);
1655 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1656 "%p %s %llu stale:%d\n",
1657 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1658 obd_export_nid2str(exp),
1659 refcount_read(&exp->exp_handle.h_ref),
1660 atomic_read(&exp->exp_rpc_count),
1661 atomic_read(&exp->exp_cb_count),
1662 atomic_read(&exp->exp_locks_count),
1663 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1664 nreplies, first_reply, nreplies > 3 ? "..." : "",
1665 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1666 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1667 if (locks && class_export_dump_hook != NULL)
1668 class_export_dump_hook(exp);
1672 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1674 struct obd_export *exp;
1676 spin_lock(&obd->obd_dev_lock);
1677 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1678 print_export_data(exp, "ACTIVE", locks, debug_level);
1679 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1680 print_export_data(exp, "UNLINKED", locks, debug_level);
1681 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1682 print_export_data(exp, "DELAYED", locks, debug_level);
1683 spin_unlock(&obd->obd_dev_lock);
1686 void obd_exports_barrier(struct obd_device *obd)
1689 LASSERT(list_empty(&obd->obd_exports));
1690 spin_lock(&obd->obd_dev_lock);
1691 while (!list_empty(&obd->obd_unlinked_exports)) {
1692 spin_unlock(&obd->obd_dev_lock);
1693 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1694 if (waited > 5 && is_power_of_2(waited)) {
1695 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1696 "more than %d seconds. "
1697 "The obd refcount = %d. Is it stuck?\n",
1698 obd->obd_name, waited,
1699 kref_read(&obd->obd_refcount));
1700 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1703 spin_lock(&obd->obd_dev_lock);
1705 spin_unlock(&obd->obd_dev_lock);
1707 EXPORT_SYMBOL(obd_exports_barrier);
1710 * Add export to the obd_zombe thread and notify it.
1712 static void obd_zombie_export_add(struct obd_export *exp) {
1713 atomic_inc(&obd_stale_export_num);
1714 spin_lock(&exp->exp_obd->obd_dev_lock);
1715 LASSERT(!list_empty(&exp->exp_obd_chain));
1716 list_del_init(&exp->exp_obd_chain);
1717 spin_unlock(&exp->exp_obd->obd_dev_lock);
1718 queue_work(zombie_wq, &exp->exp_zombie_work);
1722 * Add import to the obd_zombe thread and notify it.
1724 static void obd_zombie_import_add(struct obd_import *imp) {
1725 LASSERT(imp->imp_sec == NULL);
1727 queue_work(zombie_wq, &imp->imp_zombie_work);
1731 * wait when obd_zombie import/export queues become empty
1733 void obd_zombie_barrier(void)
1735 wait_var_event(&obd_stale_export_num,
1736 atomic_read(&obd_stale_export_num) == 0);
1737 flush_workqueue(zombie_wq);
1739 EXPORT_SYMBOL(obd_zombie_barrier);
1742 struct obd_export *obd_stale_export_get(void)
1744 struct obd_export *exp = NULL;
1747 spin_lock(&obd_stale_export_lock);
1748 if (!list_empty(&obd_stale_exports)) {
1749 exp = list_first_entry(&obd_stale_exports,
1750 struct obd_export, exp_stale_list);
1751 list_del_init(&exp->exp_stale_list);
1753 spin_unlock(&obd_stale_export_lock);
1756 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1757 atomic_read(&obd_stale_export_num));
1761 EXPORT_SYMBOL(obd_stale_export_get);
1763 void obd_stale_export_put(struct obd_export *exp)
1767 LASSERT(list_empty(&exp->exp_stale_list));
1768 if (exp->exp_lock_hash &&
1769 atomic_read(&exp->exp_lock_hash->hs_count)) {
1770 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1771 atomic_read(&obd_stale_export_num));
1773 spin_lock_bh(&exp->exp_bl_list_lock);
1774 spin_lock(&obd_stale_export_lock);
1775 /* Add to the tail if there is no blocked locks,
1776 * to the head otherwise. */
1777 if (list_empty(&exp->exp_bl_list))
1778 list_add_tail(&exp->exp_stale_list,
1779 &obd_stale_exports);
1781 list_add(&exp->exp_stale_list,
1782 &obd_stale_exports);
1784 spin_unlock(&obd_stale_export_lock);
1785 spin_unlock_bh(&exp->exp_bl_list_lock);
1787 class_export_put(exp);
1791 EXPORT_SYMBOL(obd_stale_export_put);
1794 * Adjust the position of the export in the stale list,
1795 * i.e. move to the head of the list if is needed.
1797 void obd_stale_export_adjust(struct obd_export *exp)
1799 LASSERT(exp != NULL);
1800 spin_lock_bh(&exp->exp_bl_list_lock);
1801 spin_lock(&obd_stale_export_lock);
1803 if (!list_empty(&exp->exp_stale_list) &&
1804 !list_empty(&exp->exp_bl_list))
1805 list_move(&exp->exp_stale_list, &obd_stale_exports);
1807 spin_unlock(&obd_stale_export_lock);
1808 spin_unlock_bh(&exp->exp_bl_list_lock);
1810 EXPORT_SYMBOL(obd_stale_export_adjust);
1813 * start destroy zombie import/export thread
1815 int obd_zombie_impexp_init(void)
1817 zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1819 cfs_cpt_number(cfs_cpt_tab));
1821 return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1825 * stop destroy zombie import/export thread
1827 void obd_zombie_impexp_stop(void)
1829 destroy_workqueue(zombie_wq);
1830 LASSERT(list_empty(&obd_stale_exports));
1833 /***** Kernel-userspace comm helpers *******/
1835 /* Get length of entire message, including header */
1836 int kuc_len(int payload_len)
1838 return sizeof(struct kuc_hdr) + payload_len;
1840 EXPORT_SYMBOL(kuc_len);
1842 /* Get a pointer to kuc header, given a ptr to the payload
1843 * @param p Pointer to payload area
1844 * @returns Pointer to kuc header
1846 struct kuc_hdr * kuc_ptr(void *p)
1848 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1849 LASSERT(lh->kuc_magic == KUC_MAGIC);
1852 EXPORT_SYMBOL(kuc_ptr);
1854 /* Alloc space for a message, and fill in header
1855 * @return Pointer to payload area
1857 void *kuc_alloc(int payload_len, int transport, int type)
1860 int len = kuc_len(payload_len);
1864 return ERR_PTR(-ENOMEM);
1866 lh->kuc_magic = KUC_MAGIC;
1867 lh->kuc_transport = transport;
1868 lh->kuc_msgtype = type;
1869 lh->kuc_msglen = len;
1871 return (void *)(lh + 1);
1873 EXPORT_SYMBOL(kuc_alloc);
1875 /* Takes pointer to payload area */
1876 void kuc_free(void *p, int payload_len)
1878 struct kuc_hdr *lh = kuc_ptr(p);
1879 OBD_FREE(lh, kuc_len(payload_len));
1881 EXPORT_SYMBOL(kuc_free);
1883 struct obd_request_slot_waiter {
1884 struct list_head orsw_entry;
1885 wait_queue_head_t orsw_waitq;
1889 static bool obd_request_slot_avail(struct client_obd *cli,
1890 struct obd_request_slot_waiter *orsw)
1894 spin_lock(&cli->cl_loi_list_lock);
1895 avail = !!list_empty(&orsw->orsw_entry);
1896 spin_unlock(&cli->cl_loi_list_lock);
1902 * For network flow control, the RPC sponsor needs to acquire a credit
1903 * before sending the RPC. The credits count for a connection is defined
1904 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1905 * the subsequent RPC sponsors need to wait until others released their
1906 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1908 int obd_get_request_slot(struct client_obd *cli)
1910 struct obd_request_slot_waiter orsw;
1913 spin_lock(&cli->cl_loi_list_lock);
1914 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1915 cli->cl_rpcs_in_flight++;
1916 spin_unlock(&cli->cl_loi_list_lock);
1920 init_waitqueue_head(&orsw.orsw_waitq);
1921 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
1922 orsw.orsw_signaled = false;
1923 spin_unlock(&cli->cl_loi_list_lock);
1925 rc = l_wait_event_abortable(orsw.orsw_waitq,
1926 obd_request_slot_avail(cli, &orsw) ||
1927 orsw.orsw_signaled);
1929 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1930 * freed but other (such as obd_put_request_slot) is using it. */
1931 spin_lock(&cli->cl_loi_list_lock);
1933 if (!orsw.orsw_signaled) {
1934 if (list_empty(&orsw.orsw_entry))
1935 cli->cl_rpcs_in_flight--;
1937 list_del(&orsw.orsw_entry);
1942 if (orsw.orsw_signaled) {
1943 LASSERT(list_empty(&orsw.orsw_entry));
1947 spin_unlock(&cli->cl_loi_list_lock);
1951 EXPORT_SYMBOL(obd_get_request_slot);
1953 void obd_put_request_slot(struct client_obd *cli)
1955 struct obd_request_slot_waiter *orsw;
1957 spin_lock(&cli->cl_loi_list_lock);
1958 cli->cl_rpcs_in_flight--;
1960 /* If there is free slot, wakeup the first waiter. */
1961 if (!list_empty(&cli->cl_flight_waiters) &&
1962 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
1963 orsw = list_first_entry(&cli->cl_flight_waiters,
1964 struct obd_request_slot_waiter,
1966 list_del_init(&orsw->orsw_entry);
1967 cli->cl_rpcs_in_flight++;
1968 wake_up(&orsw->orsw_waitq);
1970 spin_unlock(&cli->cl_loi_list_lock);
1972 EXPORT_SYMBOL(obd_put_request_slot);
1974 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1976 return cli->cl_max_rpcs_in_flight;
1978 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1980 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1982 struct obd_request_slot_waiter *orsw;
1988 if (max > OBD_MAX_RIF_MAX || max < 1)
1991 CDEBUG(D_INFO, "%s: max = %u max_mod = %u rif = %u\n",
1992 cli->cl_import->imp_obd->obd_name, max,
1993 cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
1995 if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
1996 LUSTRE_MDC_NAME) == 0) {
1997 /* adjust max_mod_rpcs_in_flight to ensure it is always
1998 * strictly lower that max_rpcs_in_flight */
2000 CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2001 cli->cl_import->imp_obd->obd_name);
2004 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2005 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2011 spin_lock(&cli->cl_loi_list_lock);
2012 old = cli->cl_max_rpcs_in_flight;
2013 cli->cl_max_rpcs_in_flight = max;
2014 client_adjust_max_dirty(cli);
2018 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2019 for (i = 0; i < diff; i++) {
2020 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2021 struct obd_request_slot_waiter,
2026 list_del_init(&orsw->orsw_entry);
2027 cli->cl_rpcs_in_flight++;
2028 wake_up(&orsw->orsw_waitq);
2030 spin_unlock(&cli->cl_loi_list_lock);
2034 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2036 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2038 return cli->cl_max_mod_rpcs_in_flight;
2040 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2042 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2044 struct obd_connect_data *ocd;
2048 if (max > OBD_MAX_RIF_MAX || max < 1)
2051 ocd = &cli->cl_import->imp_connect_data;
2052 CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2053 cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2054 ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2056 if (max == OBD_MAX_RIF_MAX)
2057 max = OBD_MAX_RIF_MAX - 1;
2059 /* Cannot exceed or equal max_rpcs_in_flight. If we are asked to
2060 * increase this value, also bump up max_rpcs_in_flight to match.
2062 if (max >= cli->cl_max_rpcs_in_flight) {
2064 "%s: increasing max_rpcs_in_flight=%u to allow larger max_mod_rpcs_in_flight=%u\n",
2065 cli->cl_import->imp_obd->obd_name, max + 1, max);
2066 obd_set_max_rpcs_in_flight(cli, max + 1);
2069 /* cannot exceed max modify RPCs in flight supported by the server,
2070 * but verify ocd_connect_flags is at least initialized first. If
2071 * not, allow it and fix value later in ptlrpc_connect_set_flags().
2073 if (!ocd->ocd_connect_flags) {
2074 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2075 } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2076 maxmodrpcs = ocd->ocd_maxmodrpcs;
2077 if (maxmodrpcs == 0) { /* connection not finished yet */
2078 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2080 "%s: partial connect, assume maxmodrpcs=%hu\n",
2081 cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2086 if (max > maxmodrpcs) {
2087 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than mdt.*.max_mod_rpcs_in_flight=%hu returned by the MDT server at connection.\n",
2088 cli->cl_import->imp_obd->obd_name,
2093 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2095 prev = cli->cl_max_mod_rpcs_in_flight;
2096 cli->cl_max_mod_rpcs_in_flight = max;
2098 /* wakeup waiters if limit has been increased */
2099 if (cli->cl_max_mod_rpcs_in_flight > prev)
2100 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2102 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2106 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2108 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2109 struct seq_file *seq)
2111 unsigned long mod_tot = 0, mod_cum;
2114 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2115 lprocfs_stats_header(seq, ktime_get_real(), cli->cl_mod_rpcs_init, 25,
2117 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2118 cli->cl_mod_rpcs_in_flight);
2120 seq_printf(seq, "\n\t\t\tmodify\n");
2121 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2123 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2126 for (i = 0; i < OBD_HIST_MAX; i++) {
2127 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2130 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2131 i, mod, pct(mod, mod_tot),
2132 pct(mod_cum, mod_tot));
2133 if (mod_cum == mod_tot)
2137 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2141 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2143 /* The number of modify RPCs sent in parallel is limited
2144 * because the server has a finite number of slots per client to
2145 * store request result and ensure reply reconstruction when needed.
2146 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2147 * that takes into account server limit and cl_max_rpcs_in_flight
2149 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2150 * one close request is allowed above the maximum.
2153 struct client_obd *cli;
2156 wait_queue_entry_t wqe;
2158 static int claim_mod_rpc_function(wait_queue_entry_t *wq_entry,
2159 unsigned int mode, int flags, void *key)
2161 struct mod_waiter *w = container_of(wq_entry, struct mod_waiter, wqe);
2162 struct client_obd *cli = w->cli;
2163 bool close_req = w->close_req;
2167 /* As woken_wake_function() doesn't remove us from the wait_queue,
2168 * we use own flag to ensure we're called just once.
2173 /* A slot is available if
2174 * - number of modify RPCs in flight is less than the max
2175 * - it's a close RPC and no other close request is in flight
2177 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2178 (close_req && cli->cl_close_rpcs_in_flight == 0);
2180 cli->cl_mod_rpcs_in_flight++;
2182 cli->cl_close_rpcs_in_flight++;
2183 ret = woken_wake_function(wq_entry, mode, flags, key);
2185 } else if (cli->cl_close_rpcs_in_flight)
2186 /* No other waiter could be woken */
2188 else if (key == NULL)
2189 /* This was not a wakeup from a close completion, so there is no
2190 * point seeing if there are close waiters to be woken
2194 /* There might be be a close we could wake, keep looking */
2199 /* Get a modify RPC slot from the obd client @cli according
2200 * to the kind of operation @opc that is going to be sent
2201 * and the intent @it of the operation if it applies.
2202 * If the maximum number of modify RPCs in flight is reached
2203 * the thread is put to sleep.
2204 * Returns the tag to be set in the request message. Tag 0
2205 * is reserved for non-modifying requests.
2207 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2209 struct mod_waiter wait = {
2211 .close_req = (opc == MDS_CLOSE),
2216 init_wait(&wait.wqe);
2217 wait.wqe.func = claim_mod_rpc_function;
2219 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2220 __add_wait_queue_entry_tail(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2221 /* This wakeup will only succeed if the maximums haven't
2222 * been reached. If that happens, wait.woken will be set
2223 * and there will be no need to wait.
2225 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2226 while (wait.woken == false) {
2227 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2228 wait_woken(&wait.wqe, TASK_UNINTERRUPTIBLE,
2229 MAX_SCHEDULE_TIMEOUT);
2230 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2232 __remove_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2234 max = cli->cl_max_mod_rpcs_in_flight;
2235 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2236 cli->cl_mod_rpcs_in_flight);
2237 /* find a free tag */
2238 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2240 LASSERT(i < OBD_MAX_RIF_MAX);
2241 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2242 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2243 /* tag 0 is reserved for non-modify RPCs */
2246 "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2247 cli->cl_import->imp_obd->obd_name,
2252 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2254 /* Put a modify RPC slot from the obd client @cli according
2255 * to the kind of operation @opc that has been sent.
2257 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2259 bool close_req = false;
2264 if (opc == MDS_CLOSE)
2267 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2268 cli->cl_mod_rpcs_in_flight--;
2270 cli->cl_close_rpcs_in_flight--;
2271 /* release the tag in the bitmap */
2272 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2273 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2274 __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2276 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2278 EXPORT_SYMBOL(obd_put_mod_rpc_slot);