4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/obdclass/genops.c
33 * These are the only exported functions, they provide some generic
34 * infrastructure for managing object devices
37 #define DEBUG_SUBSYSTEM S_CLASS
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
48 DEFINE_XARRAY_ALLOC(obd_devs);
49 EXPORT_SYMBOL(obd_devs);
51 static atomic_t obd_devs_count = ATOMIC_INIT(0);
53 static struct kmem_cache *obd_device_cachep;
54 static struct kobj_type class_ktype;
55 static struct workqueue_struct *zombie_wq;
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60 const char *status, int locks, int debug_level);
62 static LIST_HEAD(obd_stale_exports);
63 static DEFINE_SPINLOCK(obd_stale_export_lock);
64 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
66 static struct obd_device *obd_device_alloc(void)
68 struct obd_device *obd;
70 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
72 obd->obd_magic = OBD_DEVICE_MAGIC;
76 static void obd_device_free(struct obd_device *obd)
79 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
80 "obd %px obd_magic %08x != %08x\n",
81 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
82 if (obd->obd_namespace != NULL) {
83 CERROR("obd %px: namespace %px was not properly cleaned up (obd_force=%d)!\n",
84 obd, obd->obd_namespace, obd->obd_force);
87 lu_ref_fini(&obd->obd_reference);
88 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
91 struct obd_type *class_search_type(const char *name)
93 struct kobject *kobj = kset_find_obj(lustre_kset, name);
95 if (kobj && kobj->ktype == &class_ktype)
96 return container_of(kobj, struct obd_type, typ_kobj);
101 EXPORT_SYMBOL(class_search_type);
103 struct obd_type *class_get_type(const char *name)
105 struct obd_type *type;
107 type = class_search_type(name);
108 #ifdef HAVE_MODULE_LOADING_SUPPORT
110 const char *modname = name;
112 #ifdef HAVE_SERVER_SUPPORT
113 if (strcmp(modname, "obdfilter") == 0)
116 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
117 modname = LUSTRE_OSP_NAME;
119 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
120 modname = LUSTRE_MDT_NAME;
121 #endif /* HAVE_SERVER_SUPPORT */
123 if (!request_module("%s", modname)) {
124 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
125 type = class_search_type(name);
127 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
133 if (try_module_get(type->typ_dt_ops->o_owner)) {
134 atomic_inc(&type->typ_refcnt);
135 /* class_search_type() returned a counted ref, this
136 * count not needed as we could get it via typ_refcnt
138 kobject_put(&type->typ_kobj);
140 kobject_put(&type->typ_kobj);
146 EXPORT_SYMBOL(class_get_type);
148 void class_put_type(struct obd_type *type)
151 module_put(type->typ_dt_ops->o_owner);
152 atomic_dec(&type->typ_refcnt);
154 EXPORT_SYMBOL(class_put_type);
156 static void class_sysfs_release(struct kobject *kobj)
158 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
160 debugfs_remove_recursive(type->typ_debugfs_entry);
161 type->typ_debugfs_entry = NULL;
164 lu_device_type_fini(type->typ_lu);
166 #ifdef CONFIG_PROC_FS
167 if (type->typ_name && type->typ_procroot)
168 remove_proc_subtree(type->typ_name, proc_lustre_root);
170 OBD_FREE(type, sizeof(*type));
173 static struct kobj_type class_ktype = {
174 .sysfs_ops = &lustre_sysfs_ops,
175 .release = class_sysfs_release,
178 #ifdef HAVE_SERVER_SUPPORT
179 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
181 struct dentry *symlink;
182 struct obd_type *type;
185 type = class_search_type(name);
187 kobject_put(&type->typ_kobj);
188 return ERR_PTR(-EEXIST);
191 OBD_ALLOC(type, sizeof(*type));
193 return ERR_PTR(-ENOMEM);
195 type->typ_kobj.kset = lustre_kset;
196 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
197 &lustre_kset->kobj, "%s", name);
201 symlink = debugfs_create_dir(name, debugfs_lustre_root);
202 type->typ_debugfs_entry = symlink;
203 type->typ_sym_filter = true;
206 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
208 if (IS_ERR(type->typ_procroot)) {
209 CERROR("%s: can't create compat proc entry: %d\n",
210 name, (int)PTR_ERR(type->typ_procroot));
211 type->typ_procroot = NULL;
217 EXPORT_SYMBOL(class_add_symlinks);
218 #endif /* HAVE_SERVER_SUPPORT */
220 #define CLASS_MAX_NAME 1024
222 int class_register_type(const struct obd_ops *dt_ops,
223 const struct md_ops *md_ops,
225 const char *name, struct lu_device_type *ldt)
227 struct obd_type *type;
232 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
234 type = class_search_type(name);
236 #ifdef HAVE_SERVER_SUPPORT
237 if (type->typ_sym_filter)
239 #endif /* HAVE_SERVER_SUPPORT */
240 kobject_put(&type->typ_kobj);
241 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
245 OBD_ALLOC(type, sizeof(*type));
249 type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
250 type->typ_kobj.kset = lustre_kset;
251 kobject_init(&type->typ_kobj, &class_ktype);
252 #ifdef HAVE_SERVER_SUPPORT
254 #endif /* HAVE_SERVER_SUPPORT */
256 type->typ_dt_ops = dt_ops;
257 type->typ_md_ops = md_ops;
259 #ifdef HAVE_SERVER_SUPPORT
260 if (type->typ_sym_filter) {
261 type->typ_sym_filter = false;
262 kobject_put(&type->typ_kobj);
266 #ifdef CONFIG_PROC_FS
267 if (enable_proc && !type->typ_procroot) {
268 type->typ_procroot = lprocfs_register(name,
271 if (IS_ERR(type->typ_procroot)) {
272 rc = PTR_ERR(type->typ_procroot);
273 type->typ_procroot = NULL;
278 type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
280 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
283 #ifdef HAVE_SERVER_SUPPORT
287 rc = lu_device_type_init(ldt);
288 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
289 wake_up_var(&type->typ_lu);
297 kobject_put(&type->typ_kobj);
301 EXPORT_SYMBOL(class_register_type);
303 int class_unregister_type(const char *name)
305 struct obd_type *type = class_search_type(name);
311 CERROR("unknown obd type\n");
315 if (atomic_read(&type->typ_refcnt)) {
316 CERROR("type %s has refcount (%d)\n", name,
317 atomic_read(&type->typ_refcnt));
318 /* This is a bad situation, let's make the best of it */
319 /* Remove ops, but leave the name for debugging */
320 type->typ_dt_ops = NULL;
321 type->typ_md_ops = NULL;
322 GOTO(out_put, rc = -EBUSY);
325 /* Put the final ref */
326 kobject_put(&type->typ_kobj);
328 /* Put the ref returned by class_search_type() */
329 kobject_put(&type->typ_kobj);
332 } /* class_unregister_type */
333 EXPORT_SYMBOL(class_unregister_type);
336 * Create a new obd device.
338 * Allocate the new obd_device and initialize it.
340 * \param[in] type_name obd device type string.
341 * \param[in] name obd device name.
342 * \param[in] uuid obd device UUID
344 * \retval newdev pointer to created obd_device
345 * \retval ERR_PTR(errno) on error
347 struct obd_device *class_newdev(const char *type_name, const char *name,
350 struct obd_device *newdev;
351 struct obd_type *type = NULL;
355 if (strlen(name) >= MAX_OBD_NAME) {
356 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
357 RETURN(ERR_PTR(-EINVAL));
360 type = class_get_type(type_name);
362 CERROR("OBD: unknown type: %s\n", type_name);
363 RETURN(ERR_PTR(-ENODEV));
366 newdev = obd_device_alloc();
367 if (newdev == NULL) {
368 class_put_type(type);
369 RETURN(ERR_PTR(-ENOMEM));
371 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
372 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
373 newdev->obd_type = type;
374 newdev->obd_minor = -1;
376 rwlock_init(&newdev->obd_pool_lock);
377 newdev->obd_pool_limit = 0;
378 newdev->obd_pool_slv = 0;
380 INIT_LIST_HEAD(&newdev->obd_exports);
381 newdev->obd_num_exports = 0;
382 newdev->obd_grant_check_threshold = 100;
383 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
384 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
385 INIT_LIST_HEAD(&newdev->obd_exports_timed);
386 INIT_LIST_HEAD(&newdev->obd_nid_stats);
387 spin_lock_init(&newdev->obd_nid_lock);
388 spin_lock_init(&newdev->obd_dev_lock);
389 mutex_init(&newdev->obd_dev_mutex);
390 spin_lock_init(&newdev->obd_osfs_lock);
391 /* newdev->obd_osfs_age must be set to a value in the distant
392 * past to guarantee a fresh statfs is fetched on mount.
394 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
396 /* XXX belongs in setup not attach */
397 init_rwsem(&newdev->obd_observer_link_sem);
399 spin_lock_init(&newdev->obd_recovery_task_lock);
400 init_waitqueue_head(&newdev->obd_next_transno_waitq);
401 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
402 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
403 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
404 INIT_LIST_HEAD(&newdev->obd_evict_list);
405 INIT_LIST_HEAD(&newdev->obd_lwp_list);
407 llog_group_init(&newdev->obd_olg);
408 /* Detach drops this */
409 kref_init(&newdev->obd_refcount);
410 lu_ref_init(&newdev->obd_reference);
411 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
413 newdev->obd_conn_inprogress = 0;
415 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
417 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
418 newdev->obd_name, newdev);
426 * \param[in] obd obd_device to be freed
430 void class_free_dev(struct obd_device *obd)
432 struct obd_type *obd_type = obd->obd_type;
434 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
435 "%px obd_magic %08x != %08x\n",
436 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
437 LASSERTF(obd->obd_minor == -1 || class_num2obd(obd->obd_minor) == obd,
438 "obd %px != obd_devs[%d] %px\n",
439 obd, obd->obd_minor, class_num2obd(obd->obd_minor));
440 LASSERTF(kref_read(&obd->obd_refcount) == 0,
441 "obd_refcount should be 0, not %d\n",
442 kref_read(&obd->obd_refcount));
443 LASSERT(obd_type != NULL);
445 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
446 obd->obd_name, obd->obd_type->typ_name);
448 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
449 obd->obd_name, obd->obd_uuid.uuid);
450 if (obd->obd_stopping) {
453 /* If we're not stopping, we were never set up */
454 err = obd_cleanup(obd);
456 CERROR("Cleanup %s returned %d\n",
460 obd_device_free(obd);
462 class_put_type(obd_type);
466 * Unregister obd device.
468 * Remove an obd from obd_dev
470 * \param[in] new_obd obd_device to be unregistered
474 void class_unregister_device(struct obd_device *obd)
476 if (obd->obd_minor >= 0) {
477 xa_erase(&obd_devs, obd->obd_minor);
478 class_decref(obd, "obd_device_list", obd);
480 atomic_dec(&obd_devs_count);
485 * Register obd device.
487 * Add new_obd to obd_devs
489 * \param[in] new_obd obd_device to be registered
492 * \retval -EEXIST device with this name is registered
494 int class_register_device(struct obd_device *new_obd)
499 if (new_obd == NULL) {
504 /* obd_device waiting to be destroyed by "obd_zombie_impexp_thread" */
505 if (class_name2dev(new_obd->obd_name) != -1)
506 obd_zombie_barrier();
508 if (class_name2dev(new_obd->obd_name) == -1) {
509 class_incref(new_obd, "obd_device_list", new_obd);
510 rc = xa_alloc(&obd_devs, &dev_no, new_obd,
511 xa_limit_31b, GFP_ATOMIC);
516 new_obd->obd_minor = dev_no;
517 atomic_inc(&obd_devs_count);
526 int class_name2dev(const char *name)
528 struct obd_device *obd = NULL;
529 unsigned long dev_no = 0;
536 obd_device_for_each(dev_no, obd) {
537 if (strcmp(name, obd->obd_name) == 0) {
539 * Make sure we finished attaching before we give
542 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
543 if (obd->obd_attached) {
544 ret = obd->obd_minor;
555 EXPORT_SYMBOL(class_name2dev);
557 struct obd_device *class_name2obd(const char *name)
559 struct obd_device *obd = NULL;
560 unsigned long dev_no = 0;
566 obd_device_for_each(dev_no, obd) {
567 if (strcmp(name, obd->obd_name) == 0) {
569 * Make sure we finished attaching before we give
572 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
573 if (obd->obd_attached)
580 * TODO: We give out a reference without class_incref(). This isn't
581 * ideal, but this behavior is identical in previous implementations
586 EXPORT_SYMBOL(class_name2obd);
588 int class_uuid2dev(struct obd_uuid *uuid)
590 struct obd_device *obd = NULL;
591 unsigned long dev_no = 0;
595 obd_device_for_each(dev_no, obd) {
596 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
597 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
598 ret = obd->obd_minor;
607 EXPORT_SYMBOL(class_uuid2dev);
609 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
611 struct obd_device *obd = NULL;
612 unsigned long dev_no = 0;
615 obd_device_for_each(dev_no, obd) {
616 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
617 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
624 * TODO: We give out a reference without class_incref(). This isn't
625 * ideal, but this behavior is identical in previous implementations
630 EXPORT_SYMBOL(class_uuid2obd);
632 struct obd_device *class_num2obd(int dev_no)
634 return xa_load(&obd_devs, dev_no);
636 EXPORT_SYMBOL(class_num2obd);
639 * Find obd by name or uuid.
641 * Increment obd's refcount if found.
643 * \param[in] str obd name or uuid
645 * \retval NULL if not found
646 * \retval obd pointer to found obd_device
648 struct obd_device *class_str2obd(const char *str)
650 struct obd_device *obd = NULL;
651 struct obd_uuid uuid;
652 unsigned long dev_no = 0;
654 obd_str2uuid(&uuid, str);
657 obd_device_for_each(dev_no, obd) {
658 if (obd_uuid_equals(&uuid, &obd->obd_uuid) ||
659 (strcmp(str, obd->obd_name) == 0)) {
661 * Make sure we finished attaching before we give
664 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
665 if (obd->obd_attached) {
666 class_incref(obd, "find", current);
677 EXPORT_SYMBOL(class_str2obd);
680 * Get obd devices count. Device in any
682 * \retval obd device count
684 int class_obd_devs_count(void)
686 return atomic_read(&obd_devs_count);
688 EXPORT_SYMBOL(class_obd_devs_count);
690 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
691 * specified, then only the client with that uuid is returned,
692 * otherwise any client connected to the tgt is returned.
694 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
695 const char *type_name,
696 struct obd_uuid *grp_uuid)
698 struct obd_device *obd = NULL;
699 unsigned long dev_no = 0;
702 obd_device_for_each(dev_no, obd) {
703 if ((strncmp(obd->obd_type->typ_name, type_name,
704 strlen(type_name)) == 0)) {
705 if (obd_uuid_equals(tgt_uuid,
706 &obd->u.cli.cl_target_uuid) &&
707 ((grp_uuid) ? obd_uuid_equals(grp_uuid,
708 &obd->obd_uuid) : 1)) {
718 EXPORT_SYMBOL(class_find_client_obd);
721 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
722 * adjust sptlrpc settings accordingly.
724 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
726 struct obd_device *obd = NULL;
727 unsigned long dev_no = 0;
731 LASSERT(namelen > 0);
734 obd_device_for_each(dev_no, obd) {
735 if (obd->obd_set_up == 0 || obd->obd_stopping)
738 /* only notify mdc, osc, osp, lwp, mdt, ost
739 * because only these have a -sptlrpc llog
741 type = obd->obd_type->typ_name;
742 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
743 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
744 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
745 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
746 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
747 strcmp(type, LUSTRE_OST_NAME) != 0)
750 if (strncmp(obd->obd_name, fsname, namelen))
753 class_incref(obd, __func__, obd);
755 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
756 sizeof(KEY_SPTLRPC_CONF),
757 KEY_SPTLRPC_CONF, 0, NULL, NULL);
760 class_decref(obd, __func__, obd);
766 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
768 void obd_cleanup_caches(void)
771 if (obd_device_cachep) {
772 kmem_cache_destroy(obd_device_cachep);
773 obd_device_cachep = NULL;
779 int obd_init_caches(void)
785 LASSERT(obd_device_cachep == NULL);
786 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
787 sizeof(struct obd_device),
788 0, 0, 0, sizeof(struct obd_device), NULL);
789 if (!obd_device_cachep)
790 GOTO(out, rc = -ENOMEM);
794 obd_cleanup_caches();
798 static const char export_handle_owner[] = "export";
800 /* map connection to client */
801 struct obd_export *class_conn2export(struct lustre_handle *conn)
803 struct obd_export *export;
808 CDEBUG(D_CACHE, "looking for null handle\n");
812 if (conn->cookie == -1) { /* this means assign a new connection */
813 CDEBUG(D_CACHE, "want a new connection\n");
817 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
818 export = class_handle2object(conn->cookie, export_handle_owner);
821 EXPORT_SYMBOL(class_conn2export);
823 struct obd_device *class_exp2obd(struct obd_export *exp)
829 EXPORT_SYMBOL(class_exp2obd);
831 struct obd_import *class_exp2cliimp(struct obd_export *exp)
833 struct obd_device *obd = exp->exp_obd;
837 return obd->u.cli.cl_import;
839 EXPORT_SYMBOL(class_exp2cliimp);
841 /* Export management functions */
842 static void class_export_destroy(struct obd_export *exp)
844 struct obd_device *obd = exp->exp_obd;
848 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
849 LASSERT(obd != NULL);
851 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
852 exp->exp_client_uuid.uuid, obd->obd_name);
854 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
855 ptlrpc_connection_put(exp->exp_connection);
857 LASSERT(list_empty(&exp->exp_outstanding_replies));
858 LASSERT(list_empty(&exp->exp_uncommitted_replies));
859 LASSERT(list_empty(&exp->exp_req_replay_queue));
860 LASSERT(list_empty(&exp->exp_hp_rpcs));
861 obd_destroy_export(exp);
862 /* self export doesn't hold a reference to an obd, although it
863 * exists until freeing of the obd
865 if (exp != obd->obd_self_export)
866 class_decref(obd, "export", exp);
868 OBD_FREE_PRE(exp, sizeof(*exp), "kfree_rcu");
869 kfree_rcu(exp, exp_handle.h_rcu);
873 struct obd_export *class_export_get(struct obd_export *exp)
875 refcount_inc(&exp->exp_handle.h_ref);
876 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
877 refcount_read(&exp->exp_handle.h_ref));
880 EXPORT_SYMBOL(class_export_get);
882 void class_export_put(struct obd_export *exp)
884 LASSERT(exp != NULL);
885 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
886 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
887 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
888 refcount_read(&exp->exp_handle.h_ref) - 1);
890 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
891 struct obd_device *obd = exp->exp_obd;
893 CDEBUG(D_IOCTL, "final put %p/%s\n",
894 exp, exp->exp_client_uuid.uuid);
896 /* release nid stat refererence */
897 lprocfs_exp_cleanup(exp);
899 if (exp == obd->obd_self_export) {
900 /* self export should be destroyed without zombie
901 * thread as it doesn't hold a reference to obd and
902 * doesn't hold any resources
904 class_export_destroy(exp);
905 /* self export is destroyed, no class ref exist and it
906 * is safe to free obd
910 LASSERT(!list_empty(&exp->exp_obd_chain));
911 obd_zombie_export_add(exp);
916 EXPORT_SYMBOL(class_export_put);
918 static void obd_zombie_exp_cull(struct work_struct *ws)
920 struct obd_export *export;
922 export = container_of(ws, struct obd_export, exp_zombie_work);
923 class_export_destroy(export);
924 LASSERT(atomic_read(&obd_stale_export_num) > 0);
925 if (atomic_dec_and_test(&obd_stale_export_num))
926 wake_up_var(&obd_stale_export_num);
929 /* Creates a new export, adds it to the hash table, and returns a
930 * pointer to it. The refcount is 2: one for the hash reference, and
931 * one for the pointer returned by this function.
933 static struct obd_export *__class_new_export(struct obd_device *obd,
934 struct obd_uuid *cluuid,
937 struct obd_export *export;
942 OBD_ALLOC_PTR(export);
944 return ERR_PTR(-ENOMEM);
946 export->exp_conn_cnt = 0;
947 export->exp_lock_hash = NULL;
948 export->exp_flock_hash = NULL;
949 /* 2 = class_handle_hash + last */
950 refcount_set(&export->exp_handle.h_ref, 2);
951 atomic_set(&export->exp_rpc_count, 0);
952 atomic_set(&export->exp_cb_count, 0);
953 atomic_set(&export->exp_locks_count, 0);
954 #if LUSTRE_TRACKS_LOCK_EXP_REFS
955 INIT_LIST_HEAD(&export->exp_locks_list);
956 spin_lock_init(&export->exp_locks_list_guard);
958 atomic_set(&export->exp_replay_count, 0);
959 export->exp_obd = obd;
960 INIT_LIST_HEAD(&export->exp_outstanding_replies);
961 spin_lock_init(&export->exp_uncommitted_replies_lock);
962 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
963 INIT_LIST_HEAD(&export->exp_req_replay_queue);
964 INIT_HLIST_NODE(&export->exp_handle.h_link);
965 INIT_LIST_HEAD(&export->exp_hp_rpcs);
966 INIT_LIST_HEAD(&export->exp_reg_rpcs);
967 class_handle_hash(&export->exp_handle, export_handle_owner);
968 export->exp_last_request_time = ktime_get_real_seconds();
969 spin_lock_init(&export->exp_lock);
970 spin_lock_init(&export->exp_rpc_lock);
971 INIT_HLIST_NODE(&export->exp_gen_hash);
972 spin_lock_init(&export->exp_bl_list_lock);
973 INIT_LIST_HEAD(&export->exp_bl_list);
974 INIT_LIST_HEAD(&export->exp_stale_list);
975 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
977 export->exp_sp_peer = LUSTRE_SP_ANY;
978 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
979 export->exp_client_uuid = *cluuid;
980 obd_init_export(export);
982 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
983 export->exp_root_fid.f_seq = 0;
984 export->exp_root_fid.f_oid = 0;
985 export->exp_root_fid.f_ver = 0;
987 spin_lock(&obd->obd_dev_lock);
988 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
989 /* shouldn't happen, but might race */
990 if (obd->obd_stopping)
991 GOTO(exit_unlock, rc = -ENODEV);
993 rc = obd_uuid_add(obd, export);
995 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
996 obd->obd_name, cluuid->uuid, rc);
997 GOTO(exit_unlock, rc = -EALREADY);
1002 class_incref(obd, "export", export);
1003 list_add_tail(&export->exp_obd_chain_timed,
1004 &obd->obd_exports_timed);
1005 list_add(&export->exp_obd_chain, &obd->obd_exports);
1006 obd->obd_num_exports++;
1008 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1009 INIT_LIST_HEAD(&export->exp_obd_chain);
1011 spin_unlock(&obd->obd_dev_lock);
1015 spin_unlock(&obd->obd_dev_lock);
1016 class_handle_unhash(&export->exp_handle);
1017 obd_destroy_export(export);
1018 OBD_FREE_PTR(export);
1022 struct obd_export *class_new_export(struct obd_device *obd,
1023 struct obd_uuid *uuid)
1025 return __class_new_export(obd, uuid, false);
1027 EXPORT_SYMBOL(class_new_export);
1029 struct obd_export *class_new_export_self(struct obd_device *obd,
1030 struct obd_uuid *uuid)
1032 return __class_new_export(obd, uuid, true);
1035 void class_unlink_export(struct obd_export *exp)
1037 class_handle_unhash(&exp->exp_handle);
1039 if (exp->exp_obd->obd_self_export == exp) {
1040 class_export_put(exp);
1044 spin_lock(&exp->exp_obd->obd_dev_lock);
1045 /* delete an uuid-export hashitem from hashtables */
1046 if (exp != exp->exp_obd->obd_self_export)
1047 obd_uuid_del(exp->exp_obd, exp);
1049 #ifdef HAVE_SERVER_SUPPORT
1050 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1051 struct tg_export_data *ted = &exp->exp_target_data;
1052 struct cfs_hash *hash;
1054 /* Because obd_gen_hash will not be released until
1055 * class_cleanup(), so hash should never be NULL here
1057 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1058 LASSERT(hash != NULL);
1059 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1060 &exp->exp_gen_hash);
1061 cfs_hash_putref(hash);
1063 #endif /* HAVE_SERVER_SUPPORT */
1065 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1066 list_del_init(&exp->exp_obd_chain_timed);
1067 exp->exp_obd->obd_num_exports--;
1068 spin_unlock(&exp->exp_obd->obd_dev_lock);
1070 /* A reference is kept by obd_stale_exports list */
1071 obd_stale_export_put(exp);
1073 EXPORT_SYMBOL(class_unlink_export);
1075 /* Import management functions */
1076 static void obd_zombie_import_free(struct obd_import *imp)
1078 struct obd_import_conn *imp_conn;
1081 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1082 imp->imp_obd->obd_name);
1084 LASSERT(refcount_read(&imp->imp_refcount) == 0);
1086 ptlrpc_connection_put(imp->imp_connection);
1088 while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1089 struct obd_import_conn,
1090 oic_item)) != NULL) {
1091 list_del_init(&imp_conn->oic_item);
1092 ptlrpc_connection_put(imp_conn->oic_conn);
1093 OBD_FREE(imp_conn, sizeof(*imp_conn));
1096 LASSERT(imp->imp_sec == NULL);
1097 LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1098 imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1099 class_decref(imp->imp_obd, "import", imp);
1104 struct obd_import *class_import_get(struct obd_import *import)
1106 refcount_inc(&import->imp_refcount);
1107 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1108 refcount_read(&import->imp_refcount),
1109 import->imp_obd->obd_name);
1112 EXPORT_SYMBOL(class_import_get);
1114 void class_import_put(struct obd_import *imp)
1118 LASSERT(refcount_read(&imp->imp_refcount) > 0);
1120 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1121 refcount_read(&imp->imp_refcount) - 1,
1122 imp->imp_obd->obd_name);
1124 if (refcount_dec_and_test(&imp->imp_refcount)) {
1125 CDEBUG(D_INFO, "final put import %p\n", imp);
1126 obd_zombie_import_add(imp);
1131 EXPORT_SYMBOL(class_import_put);
1133 static void init_imp_at(struct imp_at *at)
1137 at_init(&at->iat_net_latency, 0, 0);
1138 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1139 /* max service estimates are tracked server side, so dont't
1140 * use AT history here, just use the last reported val. (But
1141 * keep hist for proc histogram, worst_ever)
1143 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1148 static void obd_zombie_imp_cull(struct work_struct *ws)
1150 struct obd_import *import;
1152 import = container_of(ws, struct obd_import, imp_zombie_work);
1153 obd_zombie_import_free(import);
1156 struct obd_import *class_new_import(struct obd_device *obd)
1158 struct obd_import *imp;
1159 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1161 OBD_ALLOC(imp, sizeof(*imp));
1165 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1166 INIT_LIST_HEAD(&imp->imp_replay_list);
1167 INIT_LIST_HEAD(&imp->imp_sending_list);
1168 INIT_LIST_HEAD(&imp->imp_delayed_list);
1169 INIT_LIST_HEAD(&imp->imp_committed_list);
1170 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1171 imp->imp_known_replied_xid = 0;
1172 imp->imp_replay_cursor = &imp->imp_committed_list;
1173 spin_lock_init(&imp->imp_lock);
1174 imp->imp_last_success_conn = 0;
1175 imp->imp_state = LUSTRE_IMP_NEW;
1176 imp->imp_obd = class_incref(obd, "import", imp);
1177 rwlock_init(&imp->imp_sec_lock);
1178 init_waitqueue_head(&imp->imp_recovery_waitq);
1179 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1181 if (curr_pid_ns && curr_pid_ns->child_reaper)
1182 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1184 imp->imp_sec_refpid = 1;
1186 refcount_set(&imp->imp_refcount, 2);
1187 atomic_set(&imp->imp_unregistering, 0);
1188 atomic_set(&imp->imp_reqs, 0);
1189 atomic_set(&imp->imp_inflight, 0);
1190 atomic_set(&imp->imp_replay_inflight, 0);
1191 init_waitqueue_head(&imp->imp_replay_waitq);
1192 atomic_set(&imp->imp_inval_count, 0);
1193 atomic_set(&imp->imp_waiting, 0);
1194 INIT_LIST_HEAD(&imp->imp_conn_list);
1195 init_imp_at(&imp->imp_at);
1197 /* the default magic is V2, will be used in connect RPC, and
1198 * then adjusted according to the flags in request/reply.
1200 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1204 EXPORT_SYMBOL(class_new_import);
1206 void class_destroy_import(struct obd_import *import)
1208 LASSERT(import != NULL);
1209 LASSERT(import != LP_POISON);
1211 spin_lock(&import->imp_lock);
1212 import->imp_generation++;
1213 spin_unlock(&import->imp_lock);
1214 class_import_put(import);
1216 EXPORT_SYMBOL(class_destroy_import);
1218 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1220 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1222 spin_lock(&exp->exp_locks_list_guard);
1224 LASSERT(lock->l_exp_refs_nr >= 0);
1226 if (lock->l_exp_refs_target != NULL &&
1227 lock->l_exp_refs_target != exp) {
1228 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1229 exp, lock, lock->l_exp_refs_target);
1231 if ((lock->l_exp_refs_nr++) == 0) {
1232 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1233 lock->l_exp_refs_target = exp;
1235 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1236 lock, exp, lock->l_exp_refs_nr);
1237 spin_unlock(&exp->exp_locks_list_guard);
1239 EXPORT_SYMBOL(__class_export_add_lock_ref);
1241 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1243 spin_lock(&exp->exp_locks_list_guard);
1244 LASSERT(lock->l_exp_refs_nr > 0);
1245 if (lock->l_exp_refs_target != exp) {
1246 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1247 lock, lock->l_exp_refs_target, exp);
1249 if (-- lock->l_exp_refs_nr == 0) {
1250 list_del_init(&lock->l_exp_refs_link);
1251 lock->l_exp_refs_target = NULL;
1253 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1254 lock, exp, lock->l_exp_refs_nr);
1255 spin_unlock(&exp->exp_locks_list_guard);
1257 EXPORT_SYMBOL(__class_export_del_lock_ref);
1260 /* A connection defines an export context in which preallocation can be
1261 * managed. This releases the export pointer reference, and returns the export
1262 * handle, so the export refcount is 1 when this function returns.
1264 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1265 struct obd_uuid *cluuid)
1267 struct obd_export *export;
1269 LASSERT(conn != NULL);
1270 LASSERT(obd != NULL);
1271 LASSERT(cluuid != NULL);
1274 export = class_new_export(obd, cluuid);
1276 RETURN(PTR_ERR(export));
1278 conn->cookie = export->exp_handle.h_cookie;
1279 class_export_put(export);
1281 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1282 cluuid->uuid, conn->cookie);
1285 EXPORT_SYMBOL(class_connect);
1287 /* if export is involved in recovery then clean up related things */
1288 static void class_export_recovery_cleanup(struct obd_export *exp)
1290 struct obd_device *obd = exp->exp_obd;
1292 spin_lock(&obd->obd_recovery_task_lock);
1293 if (obd->obd_recovering) {
1294 if (exp->exp_in_recovery) {
1295 spin_lock(&exp->exp_lock);
1296 exp->exp_in_recovery = 0;
1297 spin_unlock(&exp->exp_lock);
1298 LASSERT(atomic_read(&(obd)->obd_connected_clients) > 0);
1299 atomic_dec(&obd->obd_connected_clients);
1302 /* if called during recovery then should update
1303 * obd_stale_clients counter, lightweight exports is not counted
1305 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1306 exp->exp_obd->obd_stale_clients++;
1308 spin_unlock(&obd->obd_recovery_task_lock);
1310 spin_lock(&exp->exp_lock);
1311 /** Cleanup req replay fields */
1312 if (exp->exp_req_replay_needed) {
1313 exp->exp_req_replay_needed = 0;
1315 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1316 atomic_dec(&obd->obd_req_replay_clients);
1319 /** Cleanup lock replay data */
1320 if (exp->exp_lock_replay_needed) {
1321 exp->exp_lock_replay_needed = 0;
1323 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1324 atomic_dec(&obd->obd_lock_replay_clients);
1326 spin_unlock(&exp->exp_lock);
1329 /* This function removes 1-3 references from the export:
1330 * 1 - for export pointer passed
1331 * and if disconnect really need
1332 * 2 - removing from hash
1333 * 3 - in client_unlink_export
1334 * The export pointer passed to this function can destroyed
1336 int class_disconnect(struct obd_export *export)
1338 int already_disconnected;
1342 if (export == NULL) {
1343 CWARN("attempting to free NULL export %p\n", export);
1347 spin_lock(&export->exp_lock);
1348 already_disconnected = export->exp_disconnected;
1349 export->exp_disconnected = 1;
1350 #ifdef HAVE_SERVER_SUPPORT
1351 /* We hold references of export for uuid hash and nid_hash and export
1352 * link at least. So it is safe to call rh*table_remove_fast in there.
1354 obd_nid_del(export->exp_obd, export);
1355 #endif /* HAVE_SERVER_SUPPORT */
1356 spin_unlock(&export->exp_lock);
1358 /* class_cleanup(), abort_recovery(), and class_fail_export() all end up
1359 * here, and any of them race we shouldn't call extra class_export_puts
1361 if (already_disconnected)
1362 GOTO(no_disconn, already_disconnected);
1364 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1365 export->exp_handle.h_cookie);
1367 class_export_recovery_cleanup(export);
1368 class_unlink_export(export);
1370 class_export_put(export);
1373 EXPORT_SYMBOL(class_disconnect);
1375 /* Return non-zero for a fully connected export */
1376 int class_connected_export(struct obd_export *exp)
1381 spin_lock(&exp->exp_lock);
1382 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1383 spin_unlock(&exp->exp_lock);
1387 EXPORT_SYMBOL(class_connected_export);
1389 static void class_disconnect_export_list(struct list_head *list,
1390 enum obd_option flags)
1393 struct obd_export *exp;
1397 /* It's possible that an export may disconnect itself, but
1398 * nothing else will be added to this list.
1400 while ((exp = list_first_entry_or_null(list, struct obd_export,
1401 exp_obd_chain)) != NULL) {
1402 /* need for safe call CDEBUG after obd_disconnect */
1403 class_export_get(exp);
1405 spin_lock(&exp->exp_lock);
1406 exp->exp_flags = flags;
1407 spin_unlock(&exp->exp_lock);
1409 if (obd_uuid_equals(&exp->exp_client_uuid,
1410 &exp->exp_obd->obd_uuid)) {
1412 "exp %p export uuid == obd uuid, don't discon\n",
1414 /* Need to delete this now so we don't end up pointing
1415 * to work_list later when this export is cleaned up.
1417 list_del_init(&exp->exp_obd_chain);
1418 class_export_put(exp);
1422 class_export_get(exp);
1423 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at %lld\n",
1424 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1425 exp, exp->exp_last_request_time);
1426 /* release one export reference anyway */
1427 rc = obd_disconnect(exp);
1429 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1430 obd_export_nid2str(exp), exp, rc);
1431 class_export_put(exp);
1436 void class_disconnect_exports(struct obd_device *obd)
1438 LIST_HEAD(work_list);
1442 /* Move all of the exports from obd_exports to a work list, en masse. */
1443 spin_lock(&obd->obd_dev_lock);
1444 list_splice_init(&obd->obd_exports, &work_list);
1445 list_splice_init(&obd->obd_delayed_exports, &work_list);
1446 spin_unlock(&obd->obd_dev_lock);
1448 if (!list_empty(&work_list)) {
1449 CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
1450 obd->obd_minor, obd);
1451 class_disconnect_export_list(&work_list,
1452 exp_flags_from_obd(obd));
1454 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1455 obd->obd_minor, obd);
1458 EXPORT_SYMBOL(class_disconnect_exports);
1460 /* Remove exports that have not completed recovery.
1462 void class_disconnect_stale_exports(struct obd_device *obd,
1463 int (*test_export)(struct obd_export *))
1465 LIST_HEAD(work_list);
1466 struct obd_export *exp, *n;
1471 spin_lock(&obd->obd_dev_lock);
1472 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1474 /* don't count self-export as client */
1475 if (obd_uuid_equals(&exp->exp_client_uuid,
1476 &exp->exp_obd->obd_uuid))
1479 /* don't evict clients which have no slot in last_rcvd
1480 * (e.g. lightweight connection)
1482 if (exp->exp_target_data.ted_lr_idx == -1)
1485 spin_lock(&exp->exp_lock);
1486 if (exp->exp_failed || test_export(exp)) {
1487 spin_unlock(&exp->exp_lock);
1490 exp->exp_failed = 1;
1491 atomic_inc(&exp->exp_obd->obd_eviction_count);
1492 spin_unlock(&exp->exp_lock);
1494 list_move(&exp->exp_obd_chain, &work_list);
1496 CWARN("%s: disconnect stale client %s@%s\n",
1497 obd->obd_name, exp->exp_client_uuid.uuid,
1498 obd_export_nid2str(exp));
1499 print_export_data(exp, "EVICTING", 0, D_HA);
1501 spin_unlock(&obd->obd_dev_lock);
1504 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1505 obd->obd_name, evicted);
1507 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1508 OBD_OPT_ABORT_RECOV);
1511 EXPORT_SYMBOL(class_disconnect_stale_exports);
1513 void class_fail_export(struct obd_export *exp)
1515 int rc, already_failed;
1517 spin_lock(&exp->exp_lock);
1518 already_failed = exp->exp_failed;
1519 exp->exp_failed = 1;
1520 spin_unlock(&exp->exp_lock);
1522 if (already_failed) {
1523 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1524 exp, exp->exp_client_uuid.uuid);
1528 atomic_inc(&exp->exp_obd->obd_eviction_count);
1530 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1531 exp, exp->exp_client_uuid.uuid);
1533 if (obd_dump_on_timeout)
1534 libcfs_debug_dumplog();
1536 /* need for safe call CDEBUG after obd_disconnect */
1537 class_export_get(exp);
1539 /* Callers into obd_disconnect are removing their own ref(eg request) in
1540 * addition to one from hash table. We don't have such a ref so make one
1542 class_export_get(exp);
1543 rc = obd_disconnect(exp);
1545 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1547 CDEBUG(D_HA, "disconnected export %p/%s\n",
1548 exp, exp->exp_client_uuid.uuid);
1549 class_export_put(exp);
1551 EXPORT_SYMBOL(class_fail_export);
1553 #ifdef HAVE_SERVER_SUPPORT
1555 static int take_first(struct obd_export *exp, void *data)
1557 struct obd_export **expp = data;
1560 /* already have one */
1562 if (exp->exp_failed)
1563 /* Don't want this one */
1565 if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1566 /* Cannot get a ref on this one */
1572 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1574 struct lnet_nid nid_key;
1575 struct obd_export *doomed_exp;
1576 int exports_evicted = 0;
1578 libcfs_strnid(&nid_key, nid);
1580 spin_lock(&obd->obd_dev_lock);
1581 /* umount already run. evict thread should stop leaving unmount thread
1584 if (obd->obd_stopping) {
1585 spin_unlock(&obd->obd_dev_lock);
1586 return exports_evicted;
1588 spin_unlock(&obd->obd_dev_lock);
1591 while (obd_nid_export_for_each(obd, &nid_key,
1592 take_first, &doomed_exp) > 0) {
1594 LASSERTF(doomed_exp != obd->obd_self_export,
1595 "self-export is hashed by NID?\n");
1597 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1599 obd_uuid2str(&doomed_exp->exp_client_uuid),
1600 obd_export_nid2str(doomed_exp));
1602 class_fail_export(doomed_exp);
1603 class_export_put(doomed_exp);
1608 if (!exports_evicted)
1610 "%s: can't disconnect NID '%s': no exports found\n",
1611 obd->obd_name, nid);
1612 return exports_evicted;
1614 EXPORT_SYMBOL(obd_export_evict_by_nid);
1616 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1618 struct obd_export *doomed_exp = NULL;
1619 struct obd_uuid doomed_uuid;
1620 int exports_evicted = 0;
1622 spin_lock(&obd->obd_dev_lock);
1623 if (obd->obd_stopping) {
1624 spin_unlock(&obd->obd_dev_lock);
1625 return exports_evicted;
1627 spin_unlock(&obd->obd_dev_lock);
1629 obd_str2uuid(&doomed_uuid, uuid);
1630 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1631 CERROR("%s: can't evict myself\n", obd->obd_name);
1632 return exports_evicted;
1635 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1636 if (doomed_exp == NULL) {
1637 CERROR("%s: can't disconnect %s: no exports found\n",
1638 obd->obd_name, uuid);
1640 CWARN("%s: evicting %s at adminstrative request\n",
1641 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1642 class_fail_export(doomed_exp);
1643 class_export_put(doomed_exp);
1644 obd_uuid_del(obd, doomed_exp);
1648 return exports_evicted;
1650 #endif /* HAVE_SERVER_SUPPORT */
1652 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1653 void (*class_export_dump_hook)(struct obd_export *) = NULL;
1654 EXPORT_SYMBOL(class_export_dump_hook);
1657 static void print_export_data(struct obd_export *exp, const char *status,
1658 int locks, int debug_level)
1660 struct ptlrpc_reply_state *rs;
1661 struct ptlrpc_reply_state *first_reply = NULL;
1664 spin_lock(&exp->exp_lock);
1665 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1671 spin_unlock(&exp->exp_lock);
1673 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu stale:%d\n",
1674 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1675 obd_export_nid2str(exp),
1676 refcount_read(&exp->exp_handle.h_ref),
1677 atomic_read(&exp->exp_rpc_count),
1678 atomic_read(&exp->exp_cb_count),
1679 atomic_read(&exp->exp_locks_count),
1680 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1681 nreplies, first_reply, nreplies > 3 ? "..." : "",
1682 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1683 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1684 if (locks && class_export_dump_hook != NULL)
1685 class_export_dump_hook(exp);
1689 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1691 struct obd_export *exp;
1693 spin_lock(&obd->obd_dev_lock);
1694 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1695 print_export_data(exp, "ACTIVE", locks, debug_level);
1696 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1697 print_export_data(exp, "UNLINKED", locks, debug_level);
1698 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1699 print_export_data(exp, "DELAYED", locks, debug_level);
1700 spin_unlock(&obd->obd_dev_lock);
1703 void obd_exports_barrier(struct obd_device *obd)
1707 LASSERT(list_empty(&obd->obd_exports));
1708 spin_lock(&obd->obd_dev_lock);
1709 while (!list_empty(&obd->obd_unlinked_exports)) {
1710 spin_unlock(&obd->obd_dev_lock);
1711 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1712 if (waited > 5 && is_power_of_2(waited)) {
1713 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1714 obd->obd_name, waited,
1715 kref_read(&obd->obd_refcount));
1716 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1719 spin_lock(&obd->obd_dev_lock);
1721 spin_unlock(&obd->obd_dev_lock);
1723 EXPORT_SYMBOL(obd_exports_barrier);
1725 /* Add export to the obd_zombe thread and notify it. */
1726 static void obd_zombie_export_add(struct obd_export *exp)
1728 atomic_inc(&obd_stale_export_num);
1729 spin_lock(&exp->exp_obd->obd_dev_lock);
1730 LASSERT(!list_empty(&exp->exp_obd_chain));
1731 list_del_init(&exp->exp_obd_chain);
1732 spin_unlock(&exp->exp_obd->obd_dev_lock);
1733 queue_work(zombie_wq, &exp->exp_zombie_work);
1736 /* Add import to the obd_zombe thread and notify it. */
1737 static void obd_zombie_import_add(struct obd_import *imp)
1739 LASSERT(imp->imp_sec == NULL);
1741 queue_work(zombie_wq, &imp->imp_zombie_work);
1744 /* wait when obd_zombie import/export queues become empty */
1745 void obd_zombie_barrier(void)
1747 wait_var_event(&obd_stale_export_num,
1748 atomic_read(&obd_stale_export_num) == 0);
1749 flush_workqueue(zombie_wq);
1751 EXPORT_SYMBOL(obd_zombie_barrier);
1754 struct obd_export *obd_stale_export_get(void)
1756 struct obd_export *exp = NULL;
1760 spin_lock(&obd_stale_export_lock);
1761 if (!list_empty(&obd_stale_exports)) {
1762 exp = list_first_entry(&obd_stale_exports,
1763 struct obd_export, exp_stale_list);
1764 list_del_init(&exp->exp_stale_list);
1766 spin_unlock(&obd_stale_export_lock);
1769 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1770 atomic_read(&obd_stale_export_num));
1774 EXPORT_SYMBOL(obd_stale_export_get);
1776 void obd_stale_export_put(struct obd_export *exp)
1780 LASSERT(list_empty(&exp->exp_stale_list));
1781 if (exp->exp_lock_hash &&
1782 atomic_read(&exp->exp_lock_hash->hs_count)) {
1783 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1784 atomic_read(&obd_stale_export_num));
1786 spin_lock_bh(&exp->exp_bl_list_lock);
1787 spin_lock(&obd_stale_export_lock);
1788 /* Add to the tail if there is no blocked locks,
1789 * to the head otherwise.
1791 if (list_empty(&exp->exp_bl_list))
1792 list_add_tail(&exp->exp_stale_list,
1793 &obd_stale_exports);
1795 list_add(&exp->exp_stale_list,
1796 &obd_stale_exports);
1798 spin_unlock(&obd_stale_export_lock);
1799 spin_unlock_bh(&exp->exp_bl_list_lock);
1801 class_export_put(exp);
1805 EXPORT_SYMBOL(obd_stale_export_put);
1808 * Adjust the position of the export in the stale list,
1809 * i.e. move to the head of the list if is needed.
1811 void obd_stale_export_adjust(struct obd_export *exp)
1813 LASSERT(exp != NULL);
1814 spin_lock_bh(&exp->exp_bl_list_lock);
1815 spin_lock(&obd_stale_export_lock);
1817 if (!list_empty(&exp->exp_stale_list) &&
1818 !list_empty(&exp->exp_bl_list))
1819 list_move(&exp->exp_stale_list, &obd_stale_exports);
1821 spin_unlock(&obd_stale_export_lock);
1822 spin_unlock_bh(&exp->exp_bl_list_lock);
1824 EXPORT_SYMBOL(obd_stale_export_adjust);
1826 /* start destroy zombie import/export thread */
1827 int obd_zombie_impexp_init(void)
1829 zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1831 cfs_cpt_number(cfs_cpt_tab));
1833 return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1836 /* stop destroy zombie import/export thread */
1837 void obd_zombie_impexp_stop(void)
1839 destroy_workqueue(zombie_wq);
1840 LASSERT(list_empty(&obd_stale_exports));
1843 /***** Kernel-userspace comm helpers *******/
1845 /* Get length of entire message, including header */
1846 int kuc_len(int payload_len)
1848 return sizeof(struct kuc_hdr) + payload_len;
1850 EXPORT_SYMBOL(kuc_len);
1852 /* Get a pointer to kuc header, given a ptr to the payload
1853 * @param p Pointer to payload area
1854 * @returns Pointer to kuc header
1856 struct kuc_hdr *kuc_ptr(void *p)
1858 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1860 LASSERT(lh->kuc_magic == KUC_MAGIC);
1863 EXPORT_SYMBOL(kuc_ptr);
1865 /* Alloc space for a message, and fill in header
1866 * @return Pointer to payload area
1868 void *kuc_alloc(int payload_len, int transport, int type)
1871 int len = kuc_len(payload_len);
1875 return ERR_PTR(-ENOMEM);
1877 lh->kuc_magic = KUC_MAGIC;
1878 lh->kuc_transport = transport;
1879 lh->kuc_msgtype = type;
1880 lh->kuc_msglen = len;
1882 return (void *)(lh + 1);
1884 EXPORT_SYMBOL(kuc_alloc);
1886 /* Takes pointer to payload area */
1887 void kuc_free(void *p, int payload_len)
1889 struct kuc_hdr *lh = kuc_ptr(p);
1891 OBD_FREE(lh, kuc_len(payload_len));
1893 EXPORT_SYMBOL(kuc_free);
1895 struct obd_request_slot_waiter {
1896 struct list_head orsw_entry;
1897 wait_queue_head_t orsw_waitq;
1901 static bool obd_request_slot_avail(struct client_obd *cli,
1902 struct obd_request_slot_waiter *orsw)
1906 spin_lock(&cli->cl_loi_list_lock);
1907 avail = !!list_empty(&orsw->orsw_entry);
1908 spin_unlock(&cli->cl_loi_list_lock);
1914 * For network flow control, the RPC sponsor needs to acquire a credit
1915 * before sending the RPC. The credits count for a connection is defined
1916 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1917 * the subsequent RPC sponsors need to wait until others released their
1918 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1920 int obd_get_request_slot(struct client_obd *cli)
1922 struct obd_request_slot_waiter orsw;
1925 spin_lock(&cli->cl_loi_list_lock);
1926 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1927 cli->cl_rpcs_in_flight++;
1928 spin_unlock(&cli->cl_loi_list_lock);
1932 init_waitqueue_head(&orsw.orsw_waitq);
1933 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
1934 orsw.orsw_signaled = false;
1935 spin_unlock(&cli->cl_loi_list_lock);
1937 rc = l_wait_event_abortable(orsw.orsw_waitq,
1938 obd_request_slot_avail(cli, &orsw) ||
1939 orsw.orsw_signaled);
1941 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1942 * freed but other (such as obd_put_request_slot) is using it.
1944 spin_lock(&cli->cl_loi_list_lock);
1946 if (!orsw.orsw_signaled) {
1947 if (list_empty(&orsw.orsw_entry))
1948 cli->cl_rpcs_in_flight--;
1950 list_del(&orsw.orsw_entry);
1955 if (orsw.orsw_signaled) {
1956 LASSERT(list_empty(&orsw.orsw_entry));
1960 spin_unlock(&cli->cl_loi_list_lock);
1964 EXPORT_SYMBOL(obd_get_request_slot);
1966 void obd_put_request_slot(struct client_obd *cli)
1968 struct obd_request_slot_waiter *orsw;
1970 spin_lock(&cli->cl_loi_list_lock);
1971 cli->cl_rpcs_in_flight--;
1973 /* If there is free slot, wakeup the first waiter. */
1974 if (!list_empty(&cli->cl_flight_waiters) &&
1975 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
1976 orsw = list_first_entry(&cli->cl_flight_waiters,
1977 struct obd_request_slot_waiter,
1979 list_del_init(&orsw->orsw_entry);
1980 cli->cl_rpcs_in_flight++;
1981 wake_up(&orsw->orsw_waitq);
1983 spin_unlock(&cli->cl_loi_list_lock);
1985 EXPORT_SYMBOL(obd_put_request_slot);
1987 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1989 return cli->cl_max_rpcs_in_flight;
1991 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1993 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1995 struct obd_request_slot_waiter *orsw;
2001 if (max > OBD_MAX_RIF_MAX || max < 1)
2004 CDEBUG(D_INFO, "%s: max = %u max_mod = %u rif = %u\n",
2005 cli->cl_import->imp_obd->obd_name, max,
2006 cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2008 if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2009 LUSTRE_MDC_NAME) == 0) {
2010 /* adjust max_mod_rpcs_in_flight to ensure it is always
2011 * strictly lower that max_rpcs_in_flight
2014 CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2015 cli->cl_import->imp_obd->obd_name);
2018 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2019 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2025 spin_lock(&cli->cl_loi_list_lock);
2026 old = cli->cl_max_rpcs_in_flight;
2027 cli->cl_max_rpcs_in_flight = max;
2028 client_adjust_max_dirty(cli);
2032 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2033 for (i = 0; i < diff; i++) {
2034 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2035 struct obd_request_slot_waiter,
2040 list_del_init(&orsw->orsw_entry);
2041 cli->cl_rpcs_in_flight++;
2042 wake_up(&orsw->orsw_waitq);
2044 spin_unlock(&cli->cl_loi_list_lock);
2048 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2050 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2052 return cli->cl_max_mod_rpcs_in_flight;
2054 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2056 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2058 struct obd_connect_data *ocd;
2062 if (max > OBD_MAX_RIF_MAX || max < 1)
2065 ocd = &cli->cl_import->imp_connect_data;
2066 CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2067 cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2068 ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2070 if (max == OBD_MAX_RIF_MAX)
2071 max = OBD_MAX_RIF_MAX - 1;
2073 /* Cannot exceed or equal max_rpcs_in_flight. If we are asked to
2074 * increase this value, also bump up max_rpcs_in_flight to match.
2076 if (max >= cli->cl_max_rpcs_in_flight) {
2078 "%s: increasing max_rpcs_in_flight=%u to allow larger max_mod_rpcs_in_flight=%u\n",
2079 cli->cl_import->imp_obd->obd_name, max + 1, max);
2080 obd_set_max_rpcs_in_flight(cli, max + 1);
2083 /* cannot exceed max modify RPCs in flight supported by the server,
2084 * but verify ocd_connect_flags is at least initialized first. If
2085 * not, allow it and fix value later in ptlrpc_connect_set_flags().
2087 if (!ocd->ocd_connect_flags) {
2088 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2089 } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2090 maxmodrpcs = ocd->ocd_maxmodrpcs;
2091 if (maxmodrpcs == 0) { /* connection not finished yet */
2092 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2094 "%s: partial connect, assume maxmodrpcs=%hu\n",
2095 cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2100 if (max > maxmodrpcs) {
2101 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than mdt.*.max_mod_rpcs_in_flight=%hu returned by the MDT server at connection.\n",
2102 cli->cl_import->imp_obd->obd_name,
2107 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2109 prev = cli->cl_max_mod_rpcs_in_flight;
2110 cli->cl_max_mod_rpcs_in_flight = max;
2112 /* wakeup waiters if limit has been increased */
2113 if (cli->cl_max_mod_rpcs_in_flight > prev)
2114 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2116 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2120 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2122 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2123 struct seq_file *seq)
2125 unsigned long mod_tot = 0, mod_cum;
2128 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2129 lprocfs_stats_header(seq, ktime_get_real(), cli->cl_mod_rpcs_init, 25,
2131 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2132 cli->cl_mod_rpcs_in_flight);
2134 seq_puts(seq, "\n\t\t\tmodify\n");
2135 seq_puts(seq, "rpcs in flight rpcs %% cum %%\n");
2137 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2140 for (i = 0; i < OBD_HIST_MAX; i++) {
2141 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2144 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2145 i, mod, pct(mod, mod_tot),
2146 pct(mod_cum, mod_tot));
2147 if (mod_cum == mod_tot)
2151 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2155 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2157 /* The number of modify RPCs sent in parallel is limited
2158 * because the server has a finite number of slots per client to
2159 * store request result and ensure reply reconstruction when needed.
2160 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2161 * that takes into account server limit and cl_max_rpcs_in_flight
2163 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2164 * one close request is allowed above the maximum.
2167 struct client_obd *cli;
2170 wait_queue_entry_t wqe;
2172 static int claim_mod_rpc_function(wait_queue_entry_t *wq_entry,
2173 unsigned int mode, int flags, void *key)
2175 struct mod_waiter *w = container_of(wq_entry, struct mod_waiter, wqe);
2176 struct client_obd *cli = w->cli;
2177 bool close_req = w->close_req;
2181 /* As woken_wake_function() doesn't remove us from the wait_queue,
2182 * we use own flag to ensure we're called just once.
2187 /* A slot is available if
2188 * - number of modify RPCs in flight is less than the max
2189 * - it's a close RPC and no other close request is in flight
2191 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2192 (close_req && cli->cl_close_rpcs_in_flight == 0);
2194 cli->cl_mod_rpcs_in_flight++;
2196 cli->cl_close_rpcs_in_flight++;
2197 ret = woken_wake_function(wq_entry, mode, flags, key);
2199 } else if (cli->cl_close_rpcs_in_flight)
2200 /* No other waiter could be woken */
2202 else if (key == NULL)
2203 /* This was not a wakeup from a close completion, so there is no
2204 * point seeing if there are close waiters to be woken
2208 /* There might be be a close we could wake, keep looking */
2213 /* Get a modify RPC slot from the obd client @cli according
2214 * to the kind of operation @opc that is going to be sent
2215 * and the intent @it of the operation if it applies.
2216 * If the maximum number of modify RPCs in flight is reached
2217 * the thread is put to sleep.
2218 * Returns the tag to be set in the request message. Tag 0
2219 * is reserved for non-modifying requests.
2221 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2223 struct mod_waiter wait = {
2225 .close_req = (opc == MDS_CLOSE),
2230 init_wait(&wait.wqe);
2231 wait.wqe.func = claim_mod_rpc_function;
2233 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2234 __add_wait_queue_entry_tail(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2235 /* This wakeup will only succeed if the maximums haven't
2236 * been reached. If that happens, wait.woken will be set
2237 * and there will be no need to wait.
2239 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2240 while (wait.woken == false) {
2241 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2242 wait_woken(&wait.wqe, TASK_UNINTERRUPTIBLE,
2243 MAX_SCHEDULE_TIMEOUT);
2244 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2246 __remove_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2248 max = cli->cl_max_mod_rpcs_in_flight;
2249 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2250 cli->cl_mod_rpcs_in_flight);
2251 /* find a free tag */
2252 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2254 LASSERT(i < OBD_MAX_RIF_MAX);
2255 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2256 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2257 /* tag 0 is reserved for non-modify RPCs */
2260 "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2261 cli->cl_import->imp_obd->obd_name,
2266 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2268 /* Put a modify RPC slot from the obd client @cli according
2269 * to the kind of operation @opc that has been sent.
2271 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2273 bool close_req = false;
2278 if (opc == MDS_CLOSE)
2281 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2282 cli->cl_mod_rpcs_in_flight--;
2284 cli->cl_close_rpcs_in_flight--;
2285 /* release the tag in the bitmap */
2286 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2287 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2288 __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2290 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2292 EXPORT_SYMBOL(obd_put_mod_rpc_slot);