4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/obdclass/genops.c
33 * These are the only exported functions, they provide some generic
34 * infrastructure for managing object devices
37 #define DEBUG_SUBSYSTEM S_CLASS
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
48 DEFINE_XARRAY_ALLOC(obd_devs);
49 EXPORT_SYMBOL(obd_devs);
51 static atomic_t obd_devs_count = ATOMIC_INIT(0);
53 static struct kmem_cache *obd_device_cachep;
54 static struct kobj_type class_ktype;
55 static struct workqueue_struct *zombie_wq;
57 static void obd_zombie_export_add(struct obd_export *exp);
58 static void obd_zombie_import_add(struct obd_import *imp);
59 static void print_export_data(struct obd_export *exp,
60 const char *status, int locks, int debug_level);
62 static LIST_HEAD(obd_stale_exports);
63 static DEFINE_SPINLOCK(obd_stale_export_lock);
64 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
66 static struct obd_device *obd_device_alloc(void)
68 struct obd_device *obd;
70 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
72 obd->obd_magic = OBD_DEVICE_MAGIC;
76 static void obd_device_free(struct obd_device *obd)
79 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
80 "obd %px obd_magic %08x != %08x\n",
81 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
82 if (obd->obd_namespace != NULL) {
83 CERROR("obd %px: namespace %px was not properly cleaned up (obd_force=%d)!\n",
84 obd, obd->obd_namespace, obd->obd_force);
87 lu_ref_fini(&obd->obd_reference);
88 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
91 struct obd_type *class_search_type(const char *name)
93 struct kobject *kobj = kset_find_obj(lustre_kset, name);
95 if (kobj && kobj->ktype == &class_ktype)
96 return container_of(kobj, struct obd_type, typ_kobj);
101 EXPORT_SYMBOL(class_search_type);
103 struct obd_type *class_get_type(const char *name)
105 struct obd_type *type;
108 type = class_search_type(name);
109 #ifdef HAVE_MODULE_LOADING_SUPPORT
111 const char *modname = name;
113 #ifdef HAVE_SERVER_SUPPORT
114 if (strcmp(modname, "obdfilter") == 0)
117 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
118 modname = LUSTRE_OSP_NAME;
120 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
121 modname = LUSTRE_MDT_NAME;
122 #endif /* HAVE_SERVER_SUPPORT */
125 if (!request_module("%s", modname)) {
126 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
128 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
132 type = class_search_type(name);
137 * Holding rcu_read_lock() matches the synchronize_rcu() call
138 * in free_module() and ensures that if type->typ_dt_ops is
139 * not yet NULL, then the module won't be freed until after
140 * we rcu_read_unlock().
142 const struct obd_ops *dt_ops = READ_ONCE(type->typ_dt_ops);
144 if (dt_ops && try_module_get(dt_ops->o_owner)) {
145 atomic_inc(&type->typ_refcnt);
146 /* class_search_type() returned a counted ref, this
147 * count not needed as we could get it via typ_refcnt
149 kobject_put(&type->typ_kobj);
151 kobject_put(&type->typ_kobj);
158 EXPORT_SYMBOL(class_get_type);
160 void class_put_type(struct obd_type *type)
163 module_put(type->typ_dt_ops->o_owner);
164 atomic_dec(&type->typ_refcnt);
166 EXPORT_SYMBOL(class_put_type);
168 static void class_sysfs_release(struct kobject *kobj)
170 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
172 debugfs_remove_recursive(type->typ_debugfs_entry);
173 type->typ_debugfs_entry = NULL;
176 lu_device_type_fini(type->typ_lu);
178 #ifdef CONFIG_PROC_FS
179 if (type->typ_name && type->typ_procroot)
180 remove_proc_subtree(type->typ_name, proc_lustre_root);
182 OBD_FREE(type, sizeof(*type));
185 static struct kobj_type class_ktype = {
186 .sysfs_ops = &lustre_sysfs_ops,
187 .release = class_sysfs_release,
190 #ifdef HAVE_SERVER_SUPPORT
191 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
193 struct dentry *symlink;
194 struct obd_type *type;
197 type = class_search_type(name);
199 kobject_put(&type->typ_kobj);
200 return ERR_PTR(-EEXIST);
203 OBD_ALLOC(type, sizeof(*type));
205 return ERR_PTR(-ENOMEM);
207 type->typ_kobj.kset = lustre_kset;
208 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
209 &lustre_kset->kobj, "%s", name);
213 symlink = debugfs_create_dir(name, debugfs_lustre_root);
214 type->typ_debugfs_entry = symlink;
215 type->typ_sym_filter = true;
218 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
220 if (IS_ERR(type->typ_procroot)) {
221 CERROR("%s: can't create compat proc entry: %d\n",
222 name, (int)PTR_ERR(type->typ_procroot));
223 type->typ_procroot = NULL;
229 EXPORT_SYMBOL(class_add_symlinks);
230 #endif /* HAVE_SERVER_SUPPORT */
232 #define CLASS_MAX_NAME 1024
234 int class_register_type(const struct obd_ops *dt_ops,
235 const struct md_ops *md_ops,
237 const char *name, struct lu_device_type *ldt)
239 struct obd_type *type;
244 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
246 type = class_search_type(name);
248 #ifdef HAVE_SERVER_SUPPORT
249 if (type->typ_sym_filter)
251 #endif /* HAVE_SERVER_SUPPORT */
252 kobject_put(&type->typ_kobj);
253 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
257 OBD_ALLOC(type, sizeof(*type));
261 type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
262 type->typ_kobj.kset = lustre_kset;
263 kobject_init(&type->typ_kobj, &class_ktype);
264 #ifdef HAVE_SERVER_SUPPORT
266 #endif /* HAVE_SERVER_SUPPORT */
268 type->typ_dt_ops = dt_ops;
269 type->typ_md_ops = md_ops;
271 #ifdef HAVE_SERVER_SUPPORT
272 if (type->typ_sym_filter) {
273 type->typ_sym_filter = false;
274 kobject_put(&type->typ_kobj);
278 #ifdef CONFIG_PROC_FS
279 if (enable_proc && !type->typ_procroot) {
280 type->typ_procroot = lprocfs_register(name,
283 if (IS_ERR(type->typ_procroot)) {
284 rc = PTR_ERR(type->typ_procroot);
285 type->typ_procroot = NULL;
290 type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
292 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
295 #ifdef HAVE_SERVER_SUPPORT
299 rc = lu_device_type_init(ldt);
300 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
301 wake_up_var(&type->typ_lu);
309 kobject_put(&type->typ_kobj);
313 EXPORT_SYMBOL(class_register_type);
315 int class_unregister_type(const char *name)
317 struct obd_type *type = class_search_type(name);
322 CERROR("unknown obd type\n");
327 * Ensure that class_get_type doesn't try to get the module
328 * as it could be freed before the obd_type is released.
329 * synchronize_rcu() will be called before the module
332 type->typ_dt_ops = NULL;
334 if (atomic_read(&type->typ_refcnt)) {
335 CERROR("type %s has refcount (%d)\n", name,
336 atomic_read(&type->typ_refcnt));
337 /* This is a bad situation, let's make the best of it */
338 /* Remove ops, but leave the name for debugging */
339 type->typ_md_ops = NULL;
340 GOTO(out_put, rc = -EBUSY);
343 /* Put the final ref */
344 kobject_put(&type->typ_kobj);
346 /* Put the ref returned by class_search_type() */
347 kobject_put(&type->typ_kobj);
350 } /* class_unregister_type */
351 EXPORT_SYMBOL(class_unregister_type);
354 * Create a new obd device.
356 * Allocate the new obd_device and initialize it.
358 * \param[in] type_name obd device type string.
359 * \param[in] name obd device name.
360 * \param[in] uuid obd device UUID
362 * \retval newdev pointer to created obd_device
363 * \retval ERR_PTR(errno) on error
365 struct obd_device *class_newdev(const char *type_name, const char *name,
368 struct obd_device *newdev;
369 struct obd_type *type = NULL;
373 if (strlen(name) >= MAX_OBD_NAME) {
374 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
375 RETURN(ERR_PTR(-EINVAL));
378 type = class_get_type(type_name);
380 CERROR("OBD: unknown type: %s\n", type_name);
381 RETURN(ERR_PTR(-ENODEV));
384 newdev = obd_device_alloc();
385 if (newdev == NULL) {
386 class_put_type(type);
387 RETURN(ERR_PTR(-ENOMEM));
389 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
390 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
391 newdev->obd_type = type;
392 newdev->obd_minor = -1;
394 rwlock_init(&newdev->obd_pool_lock);
395 newdev->obd_pool_limit = 0;
396 newdev->obd_pool_slv = 0;
398 INIT_LIST_HEAD(&newdev->obd_exports);
399 newdev->obd_num_exports = 0;
400 newdev->obd_grant_check_threshold = 100;
401 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
402 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
403 INIT_LIST_HEAD(&newdev->obd_exports_timed);
404 INIT_LIST_HEAD(&newdev->obd_nid_stats);
405 spin_lock_init(&newdev->obd_nid_lock);
406 spin_lock_init(&newdev->obd_dev_lock);
407 mutex_init(&newdev->obd_dev_mutex);
408 spin_lock_init(&newdev->obd_osfs_lock);
409 /* newdev->obd_osfs_age must be set to a value in the distant
410 * past to guarantee a fresh statfs is fetched on mount.
412 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
414 /* XXX belongs in setup not attach */
415 init_rwsem(&newdev->obd_observer_link_sem);
417 spin_lock_init(&newdev->obd_recovery_task_lock);
418 init_waitqueue_head(&newdev->obd_next_transno_waitq);
419 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
420 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
421 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
422 INIT_LIST_HEAD(&newdev->obd_evict_list);
423 INIT_LIST_HEAD(&newdev->obd_lwp_list);
425 llog_group_init(&newdev->obd_olg);
426 /* Detach drops this */
427 kref_init(&newdev->obd_refcount);
428 lu_ref_init(&newdev->obd_reference);
429 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
431 atomic_set(&newdev->obd_conn_inprogress, 0);
433 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
435 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
436 newdev->obd_name, newdev);
444 * \param[in] obd obd_device to be freed
448 void class_free_dev(struct obd_device *obd)
450 struct obd_type *obd_type = obd->obd_type;
452 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
453 "%px obd_magic %08x != %08x\n",
454 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
455 LASSERTF(obd->obd_minor == -1 || class_num2obd(obd->obd_minor) == obd,
456 "obd %px != obd_devs[%d] %px\n",
457 obd, obd->obd_minor, class_num2obd(obd->obd_minor));
458 LASSERTF(kref_read(&obd->obd_refcount) == 0,
459 "obd_refcount should be 0, not %d\n",
460 kref_read(&obd->obd_refcount));
461 LASSERT(obd_type != NULL);
463 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
464 obd->obd_name, obd->obd_type->typ_name);
466 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
467 obd->obd_name, obd->obd_uuid.uuid);
468 if (obd->obd_stopping) {
471 /* If we're not stopping, we were never set up */
472 err = obd_cleanup(obd);
474 CERROR("Cleanup %s returned %d\n",
478 obd_device_free(obd);
480 class_put_type(obd_type);
484 * Unregister obd device.
486 * Remove an obd from obd_dev
488 * \param[in] new_obd obd_device to be unregistered
492 void class_unregister_device(struct obd_device *obd)
494 if (obd->obd_minor >= 0) {
495 xa_erase(&obd_devs, obd->obd_minor);
496 class_decref(obd, "obd_device_list", obd);
498 atomic_dec(&obd_devs_count);
503 * Register obd device.
505 * Add new_obd to obd_devs
507 * \param[in] new_obd obd_device to be registered
510 * \retval -EEXIST device with this name is registered
512 int class_register_device(struct obd_device *new_obd)
517 if (new_obd == NULL) {
522 /* obd_device waiting to be destroyed by "obd_zombie_impexp_thread" */
523 if (class_name2dev(new_obd->obd_name) != -1)
524 obd_zombie_barrier();
526 if (class_name2dev(new_obd->obd_name) == -1) {
527 class_incref(new_obd, "obd_device_list", new_obd);
528 rc = xa_alloc(&obd_devs, &dev_no, new_obd,
529 xa_limit_31b, GFP_ATOMIC);
534 new_obd->obd_minor = dev_no;
535 atomic_inc(&obd_devs_count);
544 int class_name2dev(const char *name)
546 struct obd_device *obd = NULL;
547 unsigned long dev_no = 0;
554 obd_device_for_each(dev_no, obd) {
555 if (strcmp(name, obd->obd_name) == 0) {
557 * Make sure we finished attaching before we give
560 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
561 if (obd->obd_attached) {
562 ret = obd->obd_minor;
573 EXPORT_SYMBOL(class_name2dev);
575 struct obd_device *class_name2obd(const char *name)
577 struct obd_device *obd = NULL;
578 unsigned long dev_no = 0;
584 obd_device_for_each(dev_no, obd) {
585 if (strcmp(name, obd->obd_name) == 0) {
587 * Make sure we finished attaching before we give
590 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
591 if (obd->obd_attached)
598 * TODO: We give out a reference without class_incref(). This isn't
599 * ideal, but this behavior is identical in previous implementations
604 EXPORT_SYMBOL(class_name2obd);
606 int class_uuid2dev(struct obd_uuid *uuid)
608 struct obd_device *obd = NULL;
609 unsigned long dev_no = 0;
613 obd_device_for_each(dev_no, obd) {
614 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
615 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
616 ret = obd->obd_minor;
625 EXPORT_SYMBOL(class_uuid2dev);
627 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
629 struct obd_device *obd = NULL;
630 unsigned long dev_no = 0;
633 obd_device_for_each(dev_no, obd) {
634 if (obd_uuid_equals(uuid, &obd->obd_uuid)) {
635 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
642 * TODO: We give out a reference without class_incref(). This isn't
643 * ideal, but this behavior is identical in previous implementations
648 EXPORT_SYMBOL(class_uuid2obd);
650 struct obd_device *class_num2obd(int dev_no)
652 return xa_load(&obd_devs, dev_no);
654 EXPORT_SYMBOL(class_num2obd);
657 * Find obd by name or uuid.
659 * Increment obd's refcount if found.
661 * \param[in] str obd name or uuid
663 * \retval NULL if not found
664 * \retval obd pointer to found obd_device
666 struct obd_device *class_str2obd(const char *str)
668 struct obd_device *obd = NULL;
669 struct obd_uuid uuid;
670 unsigned long dev_no = 0;
672 obd_str2uuid(&uuid, str);
675 obd_device_for_each(dev_no, obd) {
676 if (obd_uuid_equals(&uuid, &obd->obd_uuid) ||
677 (strcmp(str, obd->obd_name) == 0)) {
679 * Make sure we finished attaching before we give
682 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
683 if (obd->obd_attached) {
684 class_incref(obd, "find", current);
695 EXPORT_SYMBOL(class_str2obd);
698 * Get obd devices count. Device in any
700 * \retval obd device count
702 int class_obd_devs_count(void)
704 return atomic_read(&obd_devs_count);
706 EXPORT_SYMBOL(class_obd_devs_count);
708 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
709 * specified, then only the client with that uuid is returned,
710 * otherwise any client connected to the tgt is returned.
712 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
713 const char *type_name,
714 struct obd_uuid *grp_uuid)
716 struct obd_device *obd = NULL;
717 unsigned long dev_no = 0;
720 obd_device_for_each(dev_no, obd) {
721 if ((strncmp(obd->obd_type->typ_name, type_name,
722 strlen(type_name)) == 0)) {
723 if (obd_uuid_equals(tgt_uuid,
724 &obd->u.cli.cl_target_uuid) &&
725 ((grp_uuid) ? obd_uuid_equals(grp_uuid,
726 &obd->obd_uuid) : 1)) {
736 EXPORT_SYMBOL(class_find_client_obd);
739 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
740 * adjust sptlrpc settings accordingly.
742 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
744 struct obd_device *obd = NULL;
745 unsigned long dev_no = 0;
749 LASSERT(namelen > 0);
752 obd_device_for_each(dev_no, obd) {
753 if (obd->obd_set_up == 0 || obd->obd_stopping)
756 /* only notify mdc, osc, osp, lwp, mdt, ost
757 * because only these have a -sptlrpc llog
759 type = obd->obd_type->typ_name;
760 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
761 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
762 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
763 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
764 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
765 strcmp(type, LUSTRE_OST_NAME) != 0)
768 if (strncmp(obd->obd_name, fsname, namelen))
771 class_incref(obd, __func__, obd);
773 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
774 sizeof(KEY_SPTLRPC_CONF),
775 KEY_SPTLRPC_CONF, 0, NULL, NULL);
778 class_decref(obd, __func__, obd);
784 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
786 void obd_cleanup_caches(void)
789 if (obd_device_cachep) {
790 kmem_cache_destroy(obd_device_cachep);
791 obd_device_cachep = NULL;
797 int obd_init_caches(void)
803 LASSERT(obd_device_cachep == NULL);
804 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
805 sizeof(struct obd_device),
806 0, 0, 0, sizeof(struct obd_device), NULL);
807 if (!obd_device_cachep)
808 GOTO(out, rc = -ENOMEM);
812 obd_cleanup_caches();
816 static const char export_handle_owner[] = "export";
818 /* map connection to client */
819 struct obd_export *class_conn2export(struct lustre_handle *conn)
821 struct obd_export *export;
826 CDEBUG(D_CACHE, "looking for null handle\n");
830 if (conn->cookie == -1) { /* this means assign a new connection */
831 CDEBUG(D_CACHE, "want a new connection\n");
835 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
836 export = class_handle2object(conn->cookie, export_handle_owner);
839 EXPORT_SYMBOL(class_conn2export);
841 struct obd_device *class_exp2obd(struct obd_export *exp)
847 EXPORT_SYMBOL(class_exp2obd);
849 struct obd_import *class_exp2cliimp(struct obd_export *exp)
851 struct obd_device *obd = exp->exp_obd;
855 return obd->u.cli.cl_import;
857 EXPORT_SYMBOL(class_exp2cliimp);
859 /* Export management functions */
860 static void class_export_destroy(struct obd_export *exp)
862 struct obd_device *obd = exp->exp_obd;
866 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
867 LASSERT(obd != NULL);
869 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
870 exp->exp_client_uuid.uuid, obd->obd_name);
872 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
873 ptlrpc_connection_put(exp->exp_connection);
875 LASSERT(list_empty(&exp->exp_outstanding_replies));
876 LASSERT(list_empty(&exp->exp_uncommitted_replies));
877 LASSERT(list_empty(&exp->exp_req_replay_queue));
878 LASSERT(list_empty(&exp->exp_hp_rpcs));
879 obd_destroy_export(exp);
880 /* self export doesn't hold a reference to an obd, although it
881 * exists until freeing of the obd
883 if (exp != obd->obd_self_export)
884 class_decref(obd, "export", exp);
886 OBD_FREE_PRE(exp, sizeof(*exp), "kfree_rcu");
887 kfree_rcu(exp, exp_handle.h_rcu);
891 struct obd_export *class_export_get(struct obd_export *exp)
893 refcount_inc(&exp->exp_handle.h_ref);
894 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
895 refcount_read(&exp->exp_handle.h_ref));
898 EXPORT_SYMBOL(class_export_get);
900 void class_export_put(struct obd_export *exp)
902 LASSERT(exp != NULL);
903 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
904 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
905 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
906 refcount_read(&exp->exp_handle.h_ref) - 1);
908 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
909 struct obd_device *obd = exp->exp_obd;
911 CDEBUG(D_IOCTL, "final put %p/%s\n",
912 exp, exp->exp_client_uuid.uuid);
914 /* release nid stat refererence */
915 lprocfs_exp_cleanup(exp);
917 if (exp == obd->obd_self_export) {
918 /* self export should be destroyed without zombie
919 * thread as it doesn't hold a reference to obd and
920 * doesn't hold any resources
922 class_export_destroy(exp);
923 /* self export is destroyed, no class ref exist and it
924 * is safe to free obd
928 LASSERT(!list_empty(&exp->exp_obd_chain));
929 obd_zombie_export_add(exp);
934 EXPORT_SYMBOL(class_export_put);
936 static void obd_zombie_exp_cull(struct work_struct *ws)
938 struct obd_export *export;
940 export = container_of(ws, struct obd_export, exp_zombie_work);
941 class_export_destroy(export);
942 LASSERT(atomic_read(&obd_stale_export_num) > 0);
943 if (atomic_dec_and_test(&obd_stale_export_num))
944 wake_up_var(&obd_stale_export_num);
947 /* Creates a new export, adds it to the hash table, and returns a
948 * pointer to it. The refcount is 2: one for the hash reference, and
949 * one for the pointer returned by this function.
951 static struct obd_export *__class_new_export(struct obd_device *obd,
952 struct obd_uuid *cluuid,
955 struct obd_export *export;
960 OBD_ALLOC_PTR(export);
962 return ERR_PTR(-ENOMEM);
964 export->exp_conn_cnt = 0;
965 export->exp_lock_hash = NULL;
966 export->exp_flock_hash = NULL;
967 /* 2 = class_handle_hash + last */
968 refcount_set(&export->exp_handle.h_ref, 2);
969 atomic_set(&export->exp_rpc_count, 0);
970 atomic_set(&export->exp_cb_count, 0);
971 atomic_set(&export->exp_locks_count, 0);
972 #if LUSTRE_TRACKS_LOCK_EXP_REFS
973 INIT_LIST_HEAD(&export->exp_locks_list);
974 spin_lock_init(&export->exp_locks_list_guard);
976 atomic_set(&export->exp_replay_count, 0);
977 export->exp_obd = obd;
978 INIT_LIST_HEAD(&export->exp_outstanding_replies);
979 spin_lock_init(&export->exp_uncommitted_replies_lock);
980 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
981 INIT_LIST_HEAD(&export->exp_req_replay_queue);
982 INIT_HLIST_NODE(&export->exp_handle.h_link);
983 INIT_LIST_HEAD(&export->exp_hp_rpcs);
984 INIT_LIST_HEAD(&export->exp_reg_rpcs);
985 class_handle_hash(&export->exp_handle, export_handle_owner);
986 export->exp_last_request_time = ktime_get_real_seconds();
987 spin_lock_init(&export->exp_lock);
988 spin_lock_init(&export->exp_rpc_lock);
989 INIT_HLIST_NODE(&export->exp_gen_hash);
990 spin_lock_init(&export->exp_bl_list_lock);
991 INIT_LIST_HEAD(&export->exp_bl_list);
992 INIT_LIST_HEAD(&export->exp_stale_list);
993 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
995 export->exp_sp_peer = LUSTRE_SP_ANY;
996 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
997 export->exp_client_uuid = *cluuid;
998 obd_init_export(export);
1000 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1001 export->exp_root_fid.f_seq = 0;
1002 export->exp_root_fid.f_oid = 0;
1003 export->exp_root_fid.f_ver = 0;
1005 spin_lock(&obd->obd_dev_lock);
1006 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1007 /* shouldn't happen, but might race */
1008 if (obd->obd_stopping)
1009 GOTO(exit_unlock, rc = -ENODEV);
1011 rc = obd_uuid_add(obd, export);
1013 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1014 obd->obd_name, cluuid->uuid, rc);
1015 GOTO(exit_unlock, rc = -EALREADY);
1020 class_incref(obd, "export", export);
1021 list_add_tail(&export->exp_obd_chain_timed,
1022 &obd->obd_exports_timed);
1023 list_add(&export->exp_obd_chain, &obd->obd_exports);
1024 obd->obd_num_exports++;
1026 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1027 INIT_LIST_HEAD(&export->exp_obd_chain);
1029 spin_unlock(&obd->obd_dev_lock);
1033 spin_unlock(&obd->obd_dev_lock);
1034 class_handle_unhash(&export->exp_handle);
1035 obd_destroy_export(export);
1036 OBD_FREE_PTR(export);
1040 struct obd_export *class_new_export(struct obd_device *obd,
1041 struct obd_uuid *uuid)
1043 return __class_new_export(obd, uuid, false);
1045 EXPORT_SYMBOL(class_new_export);
1047 struct obd_export *class_new_export_self(struct obd_device *obd,
1048 struct obd_uuid *uuid)
1050 return __class_new_export(obd, uuid, true);
1053 void class_unlink_export(struct obd_export *exp)
1055 class_handle_unhash(&exp->exp_handle);
1057 if (exp->exp_obd->obd_self_export == exp) {
1058 class_export_put(exp);
1062 spin_lock(&exp->exp_obd->obd_dev_lock);
1063 /* delete an uuid-export hashitem from hashtables */
1064 if (exp != exp->exp_obd->obd_self_export)
1065 obd_uuid_del(exp->exp_obd, exp);
1067 #ifdef HAVE_SERVER_SUPPORT
1068 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1069 struct tg_export_data *ted = &exp->exp_target_data;
1070 struct cfs_hash *hash;
1072 /* Because obd_gen_hash will not be released until
1073 * class_cleanup(), so hash should never be NULL here
1075 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1076 LASSERT(hash != NULL);
1077 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1078 &exp->exp_gen_hash);
1079 cfs_hash_putref(hash);
1081 #endif /* HAVE_SERVER_SUPPORT */
1083 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1084 list_del_init(&exp->exp_obd_chain_timed);
1085 exp->exp_obd->obd_num_exports--;
1086 spin_unlock(&exp->exp_obd->obd_dev_lock);
1088 /* A reference is kept by obd_stale_exports list */
1089 obd_stale_export_put(exp);
1091 EXPORT_SYMBOL(class_unlink_export);
1093 /* Import management functions */
1094 static void obd_zombie_import_free(struct obd_import *imp)
1096 struct obd_import_conn *imp_conn;
1099 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1100 imp->imp_obd->obd_name);
1102 LASSERT(refcount_read(&imp->imp_refcount) == 0);
1104 ptlrpc_connection_put(imp->imp_connection);
1106 while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1107 struct obd_import_conn,
1108 oic_item)) != NULL) {
1109 list_del_init(&imp_conn->oic_item);
1110 ptlrpc_connection_put(imp_conn->oic_conn);
1111 OBD_FREE(imp_conn, sizeof(*imp_conn));
1114 LASSERT(imp->imp_sec == NULL);
1115 LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1116 imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1117 class_decref(imp->imp_obd, "import", imp);
1122 struct obd_import *class_import_get(struct obd_import *import)
1124 refcount_inc(&import->imp_refcount);
1125 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1126 refcount_read(&import->imp_refcount),
1127 import->imp_obd->obd_name);
1130 EXPORT_SYMBOL(class_import_get);
1132 void class_import_put(struct obd_import *imp)
1136 LASSERT(refcount_read(&imp->imp_refcount) > 0);
1138 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1139 refcount_read(&imp->imp_refcount) - 1,
1140 imp->imp_obd->obd_name);
1142 if (refcount_dec_and_test(&imp->imp_refcount)) {
1143 CDEBUG(D_INFO, "final put import %p\n", imp);
1144 obd_zombie_import_add(imp);
1149 EXPORT_SYMBOL(class_import_put);
1151 static void init_imp_at(struct imp_at *at)
1155 at_init(&at->iat_net_latency, 0, 0);
1156 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1157 /* max service estimates are tracked server side, so dont't
1158 * use AT history here, just use the last reported val. (But
1159 * keep hist for proc histogram, worst_ever)
1161 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1166 static void obd_zombie_imp_cull(struct work_struct *ws)
1168 struct obd_import *import;
1170 import = container_of(ws, struct obd_import, imp_zombie_work);
1171 obd_zombie_import_free(import);
1174 struct obd_import *class_new_import(struct obd_device *obd)
1176 struct obd_import *imp;
1177 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1179 OBD_ALLOC(imp, sizeof(*imp));
1183 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1184 INIT_LIST_HEAD(&imp->imp_replay_list);
1185 INIT_LIST_HEAD(&imp->imp_sending_list);
1186 INIT_LIST_HEAD(&imp->imp_delayed_list);
1187 INIT_LIST_HEAD(&imp->imp_committed_list);
1188 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1189 imp->imp_known_replied_xid = 0;
1190 imp->imp_replay_cursor = &imp->imp_committed_list;
1191 spin_lock_init(&imp->imp_lock);
1192 imp->imp_last_success_conn = 0;
1193 imp->imp_state = LUSTRE_IMP_NEW;
1194 imp->imp_obd = class_incref(obd, "import", imp);
1195 rwlock_init(&imp->imp_sec_lock);
1196 init_waitqueue_head(&imp->imp_recovery_waitq);
1197 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1199 if (curr_pid_ns && curr_pid_ns->child_reaper)
1200 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1202 imp->imp_sec_refpid = 1;
1204 refcount_set(&imp->imp_refcount, 2);
1205 atomic_set(&imp->imp_unregistering, 0);
1206 atomic_set(&imp->imp_reqs, 0);
1207 atomic_set(&imp->imp_inflight, 0);
1208 atomic_set(&imp->imp_replay_inflight, 0);
1209 init_waitqueue_head(&imp->imp_replay_waitq);
1210 atomic_set(&imp->imp_inval_count, 0);
1211 atomic_set(&imp->imp_waiting, 0);
1212 INIT_LIST_HEAD(&imp->imp_conn_list);
1213 init_imp_at(&imp->imp_at);
1215 /* the default magic is V2, will be used in connect RPC, and
1216 * then adjusted according to the flags in request/reply.
1218 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1222 EXPORT_SYMBOL(class_new_import);
1224 void class_destroy_import(struct obd_import *import)
1226 LASSERT(import != NULL);
1227 LASSERT(import != LP_POISON);
1229 spin_lock(&import->imp_lock);
1230 import->imp_generation++;
1231 spin_unlock(&import->imp_lock);
1232 class_import_put(import);
1234 EXPORT_SYMBOL(class_destroy_import);
1236 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1238 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1240 spin_lock(&exp->exp_locks_list_guard);
1242 LASSERT(lock->l_exp_refs_nr >= 0);
1244 if (lock->l_exp_refs_target != NULL &&
1245 lock->l_exp_refs_target != exp) {
1246 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1247 exp, lock, lock->l_exp_refs_target);
1249 if ((lock->l_exp_refs_nr++) == 0) {
1250 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1251 lock->l_exp_refs_target = exp;
1253 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1254 lock, exp, lock->l_exp_refs_nr);
1255 spin_unlock(&exp->exp_locks_list_guard);
1257 EXPORT_SYMBOL(__class_export_add_lock_ref);
1259 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1261 spin_lock(&exp->exp_locks_list_guard);
1262 LASSERT(lock->l_exp_refs_nr > 0);
1263 if (lock->l_exp_refs_target != exp) {
1264 LCONSOLE_WARN("lock %p, mismatching export pointers: %p, %p\n",
1265 lock, lock->l_exp_refs_target, exp);
1267 if (-- lock->l_exp_refs_nr == 0) {
1268 list_del_init(&lock->l_exp_refs_link);
1269 lock->l_exp_refs_target = NULL;
1271 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1272 lock, exp, lock->l_exp_refs_nr);
1273 spin_unlock(&exp->exp_locks_list_guard);
1275 EXPORT_SYMBOL(__class_export_del_lock_ref);
1278 /* A connection defines an export context in which preallocation can be
1279 * managed. This releases the export pointer reference, and returns the export
1280 * handle, so the export refcount is 1 when this function returns.
1282 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1283 struct obd_uuid *cluuid)
1285 struct obd_export *export;
1287 LASSERT(conn != NULL);
1288 LASSERT(obd != NULL);
1289 LASSERT(cluuid != NULL);
1292 export = class_new_export(obd, cluuid);
1294 RETURN(PTR_ERR(export));
1296 conn->cookie = export->exp_handle.h_cookie;
1297 class_export_put(export);
1299 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1300 cluuid->uuid, conn->cookie);
1303 EXPORT_SYMBOL(class_connect);
1305 /* if export is involved in recovery then clean up related things */
1306 static void class_export_recovery_cleanup(struct obd_export *exp)
1308 struct obd_device *obd = exp->exp_obd;
1310 spin_lock(&obd->obd_recovery_task_lock);
1311 if (obd->obd_recovering) {
1312 if (exp->exp_in_recovery) {
1313 spin_lock(&exp->exp_lock);
1314 exp->exp_in_recovery = 0;
1315 spin_unlock(&exp->exp_lock);
1316 LASSERT(atomic_read(&(obd)->obd_connected_clients) > 0);
1317 atomic_dec(&obd->obd_connected_clients);
1320 /* if called during recovery then should update
1321 * obd_stale_clients counter, lightweight exports is not counted
1323 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1324 exp->exp_obd->obd_stale_clients++;
1326 spin_unlock(&obd->obd_recovery_task_lock);
1328 spin_lock(&exp->exp_lock);
1329 /** Cleanup req replay fields */
1330 if (exp->exp_req_replay_needed) {
1331 exp->exp_req_replay_needed = 0;
1333 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1334 atomic_dec(&obd->obd_req_replay_clients);
1337 /** Cleanup lock replay data */
1338 if (exp->exp_lock_replay_needed) {
1339 exp->exp_lock_replay_needed = 0;
1341 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1342 atomic_dec(&obd->obd_lock_replay_clients);
1344 spin_unlock(&exp->exp_lock);
1347 /* This function removes 1-3 references from the export:
1348 * 1 - for export pointer passed
1349 * and if disconnect really need
1350 * 2 - removing from hash
1351 * 3 - in client_unlink_export
1352 * The export pointer passed to this function can destroyed
1354 int class_disconnect(struct obd_export *export)
1356 int already_disconnected;
1360 if (export == NULL) {
1361 CWARN("attempting to free NULL export %p\n", export);
1365 spin_lock(&export->exp_lock);
1366 already_disconnected = export->exp_disconnected;
1367 export->exp_disconnected = 1;
1368 #ifdef HAVE_SERVER_SUPPORT
1369 /* We hold references of export for uuid hash and nid_hash and export
1370 * link at least. So it is safe to call rh*table_remove_fast in there.
1372 obd_nid_del(export->exp_obd, export);
1373 #endif /* HAVE_SERVER_SUPPORT */
1374 spin_unlock(&export->exp_lock);
1376 /* class_cleanup(), abort_recovery(), and class_fail_export() all end up
1377 * here, and any of them race we shouldn't call extra class_export_puts
1379 if (already_disconnected)
1380 GOTO(no_disconn, already_disconnected);
1382 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1383 export->exp_handle.h_cookie);
1385 class_export_recovery_cleanup(export);
1386 class_unlink_export(export);
1388 class_export_put(export);
1391 EXPORT_SYMBOL(class_disconnect);
1393 /* Return non-zero for a fully connected export */
1394 int class_connected_export(struct obd_export *exp)
1399 spin_lock(&exp->exp_lock);
1400 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1401 spin_unlock(&exp->exp_lock);
1405 EXPORT_SYMBOL(class_connected_export);
1407 static void class_disconnect_export_list(struct list_head *list,
1408 enum obd_option flags)
1411 struct obd_export *exp;
1415 /* It's possible that an export may disconnect itself, but
1416 * nothing else will be added to this list.
1418 while ((exp = list_first_entry_or_null(list, struct obd_export,
1419 exp_obd_chain)) != NULL) {
1420 /* need for safe call CDEBUG after obd_disconnect */
1421 class_export_get(exp);
1423 spin_lock(&exp->exp_lock);
1424 exp->exp_flags = flags;
1425 spin_unlock(&exp->exp_lock);
1427 if (obd_uuid_equals(&exp->exp_client_uuid,
1428 &exp->exp_obd->obd_uuid)) {
1430 "exp %p export uuid == obd uuid, don't discon\n",
1432 /* Need to delete this now so we don't end up pointing
1433 * to work_list later when this export is cleaned up.
1435 list_del_init(&exp->exp_obd_chain);
1436 class_export_put(exp);
1440 class_export_get(exp);
1441 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), last request at %lld\n",
1442 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1443 exp, exp->exp_last_request_time);
1444 /* release one export reference anyway */
1445 rc = obd_disconnect(exp);
1447 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1448 obd_export_nid2str(exp), exp, rc);
1449 class_export_put(exp);
1454 void class_disconnect_exports(struct obd_device *obd)
1456 LIST_HEAD(work_list);
1460 /* Move all of the exports from obd_exports to a work list, en masse. */
1461 spin_lock(&obd->obd_dev_lock);
1462 list_splice_init(&obd->obd_exports, &work_list);
1463 list_splice_init(&obd->obd_delayed_exports, &work_list);
1464 spin_unlock(&obd->obd_dev_lock);
1466 if (!list_empty(&work_list)) {
1467 CDEBUG(D_HA, "OBD device %d (%p) has exports, disconnecting them\n",
1468 obd->obd_minor, obd);
1469 class_disconnect_export_list(&work_list,
1470 exp_flags_from_obd(obd));
1472 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1473 obd->obd_minor, obd);
1476 EXPORT_SYMBOL(class_disconnect_exports);
1478 /* Remove exports that have not completed recovery.
1480 void class_disconnect_stale_exports(struct obd_device *obd,
1481 int (*test_export)(struct obd_export *))
1483 LIST_HEAD(work_list);
1484 struct obd_export *exp, *n;
1489 spin_lock(&obd->obd_dev_lock);
1490 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1492 /* don't count self-export as client */
1493 if (obd_uuid_equals(&exp->exp_client_uuid,
1494 &exp->exp_obd->obd_uuid))
1497 /* don't evict clients which have no slot in last_rcvd
1498 * (e.g. lightweight connection)
1500 if (exp->exp_target_data.ted_lr_idx == -1)
1503 spin_lock(&exp->exp_lock);
1504 if (exp->exp_failed || test_export(exp)) {
1505 spin_unlock(&exp->exp_lock);
1508 exp->exp_failed = 1;
1509 atomic_inc(&exp->exp_obd->obd_eviction_count);
1510 spin_unlock(&exp->exp_lock);
1512 list_move(&exp->exp_obd_chain, &work_list);
1514 CWARN("%s: disconnect stale client %s@%s\n",
1515 obd->obd_name, exp->exp_client_uuid.uuid,
1516 obd_export_nid2str(exp));
1517 print_export_data(exp, "EVICTING", 0, D_HA);
1519 spin_unlock(&obd->obd_dev_lock);
1522 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1523 obd->obd_name, evicted);
1525 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1526 OBD_OPT_ABORT_RECOV);
1529 EXPORT_SYMBOL(class_disconnect_stale_exports);
1531 void class_fail_export(struct obd_export *exp)
1533 int rc, already_failed;
1535 spin_lock(&exp->exp_lock);
1536 already_failed = exp->exp_failed;
1537 exp->exp_failed = 1;
1538 spin_unlock(&exp->exp_lock);
1540 if (already_failed) {
1541 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1542 exp, exp->exp_client_uuid.uuid);
1546 atomic_inc(&exp->exp_obd->obd_eviction_count);
1548 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1549 exp, exp->exp_client_uuid.uuid);
1551 if (obd_dump_on_timeout)
1552 libcfs_debug_dumplog();
1554 /* need for safe call CDEBUG after obd_disconnect */
1555 class_export_get(exp);
1557 /* Callers into obd_disconnect are removing their own ref(eg request) in
1558 * addition to one from hash table. We don't have such a ref so make one
1560 class_export_get(exp);
1561 rc = obd_disconnect(exp);
1563 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1565 CDEBUG(D_HA, "disconnected export %p/%s\n",
1566 exp, exp->exp_client_uuid.uuid);
1567 class_export_put(exp);
1569 EXPORT_SYMBOL(class_fail_export);
1571 #ifdef HAVE_SERVER_SUPPORT
1573 static int take_first(struct obd_export *exp, void *data)
1575 struct obd_export **expp = data;
1578 /* already have one */
1580 if (exp->exp_failed)
1581 /* Don't want this one */
1583 if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1584 /* Cannot get a ref on this one */
1590 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1592 struct lnet_nid nid_key;
1593 struct obd_export *doomed_exp;
1594 int exports_evicted = 0;
1596 libcfs_strnid(&nid_key, nid);
1598 spin_lock(&obd->obd_dev_lock);
1599 /* umount already run. evict thread should stop leaving unmount thread
1602 if (obd->obd_stopping) {
1603 spin_unlock(&obd->obd_dev_lock);
1604 return exports_evicted;
1606 spin_unlock(&obd->obd_dev_lock);
1609 while (obd_nid_export_for_each(obd, &nid_key,
1610 take_first, &doomed_exp) > 0) {
1612 LASSERTF(doomed_exp != obd->obd_self_export,
1613 "self-export is hashed by NID?\n");
1615 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1617 obd_uuid2str(&doomed_exp->exp_client_uuid),
1618 obd_export_nid2str(doomed_exp));
1620 class_fail_export(doomed_exp);
1621 class_export_put(doomed_exp);
1626 if (!exports_evicted)
1628 "%s: can't disconnect NID '%s': no exports found\n",
1629 obd->obd_name, nid);
1630 return exports_evicted;
1632 EXPORT_SYMBOL(obd_export_evict_by_nid);
1634 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1636 struct obd_export *doomed_exp = NULL;
1637 struct obd_uuid doomed_uuid;
1638 int exports_evicted = 0;
1640 spin_lock(&obd->obd_dev_lock);
1641 if (obd->obd_stopping) {
1642 spin_unlock(&obd->obd_dev_lock);
1643 return exports_evicted;
1645 spin_unlock(&obd->obd_dev_lock);
1647 obd_str2uuid(&doomed_uuid, uuid);
1648 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1649 CERROR("%s: can't evict myself\n", obd->obd_name);
1650 return exports_evicted;
1653 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1654 if (doomed_exp == NULL) {
1655 CERROR("%s: can't disconnect %s: no exports found\n",
1656 obd->obd_name, uuid);
1658 CWARN("%s: evicting %s at adminstrative request\n",
1659 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1660 class_fail_export(doomed_exp);
1661 class_export_put(doomed_exp);
1662 obd_uuid_del(obd, doomed_exp);
1666 return exports_evicted;
1668 #endif /* HAVE_SERVER_SUPPORT */
1670 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1671 void (*class_export_dump_hook)(struct obd_export *) = NULL;
1672 EXPORT_SYMBOL(class_export_dump_hook);
1675 static void print_export_data(struct obd_export *exp, const char *status,
1676 int locks, int debug_level)
1678 struct ptlrpc_reply_state *rs;
1679 struct ptlrpc_reply_state *first_reply = NULL;
1682 spin_lock(&exp->exp_lock);
1683 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1689 spin_unlock(&exp->exp_lock);
1691 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s %llu stale:%d\n",
1692 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1693 obd_export_nid2str(exp),
1694 refcount_read(&exp->exp_handle.h_ref),
1695 atomic_read(&exp->exp_rpc_count),
1696 atomic_read(&exp->exp_cb_count),
1697 atomic_read(&exp->exp_locks_count),
1698 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1699 nreplies, first_reply, nreplies > 3 ? "..." : "",
1700 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1701 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1702 if (locks && class_export_dump_hook != NULL)
1703 class_export_dump_hook(exp);
1707 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1709 struct obd_export *exp;
1711 spin_lock(&obd->obd_dev_lock);
1712 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1713 print_export_data(exp, "ACTIVE", locks, debug_level);
1714 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1715 print_export_data(exp, "UNLINKED", locks, debug_level);
1716 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1717 print_export_data(exp, "DELAYED", locks, debug_level);
1718 spin_unlock(&obd->obd_dev_lock);
1721 void obd_exports_barrier(struct obd_device *obd)
1725 LASSERT(list_empty(&obd->obd_exports));
1726 spin_lock(&obd->obd_dev_lock);
1727 while (!list_empty(&obd->obd_unlinked_exports)) {
1728 spin_unlock(&obd->obd_dev_lock);
1729 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1730 if (waited > 5 && is_power_of_2(waited)) {
1731 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports more than %d seconds. The obd refcount = %d. Is it stuck?\n",
1732 obd->obd_name, waited,
1733 kref_read(&obd->obd_refcount));
1734 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1737 spin_lock(&obd->obd_dev_lock);
1739 spin_unlock(&obd->obd_dev_lock);
1741 EXPORT_SYMBOL(obd_exports_barrier);
1743 /* Add export to the obd_zombe thread and notify it. */
1744 static void obd_zombie_export_add(struct obd_export *exp)
1746 atomic_inc(&obd_stale_export_num);
1747 spin_lock(&exp->exp_obd->obd_dev_lock);
1748 LASSERT(!list_empty(&exp->exp_obd_chain));
1749 list_del_init(&exp->exp_obd_chain);
1750 spin_unlock(&exp->exp_obd->obd_dev_lock);
1751 queue_work(zombie_wq, &exp->exp_zombie_work);
1754 /* Add import to the obd_zombe thread and notify it. */
1755 static void obd_zombie_import_add(struct obd_import *imp)
1757 LASSERT(imp->imp_sec == NULL);
1759 queue_work(zombie_wq, &imp->imp_zombie_work);
1762 /* wait when obd_zombie import/export queues become empty */
1763 void obd_zombie_barrier(void)
1765 wait_var_event(&obd_stale_export_num,
1766 atomic_read(&obd_stale_export_num) == 0);
1767 flush_workqueue(zombie_wq);
1769 EXPORT_SYMBOL(obd_zombie_barrier);
1772 struct obd_export *obd_stale_export_get(void)
1774 struct obd_export *exp = NULL;
1778 spin_lock(&obd_stale_export_lock);
1779 if (!list_empty(&obd_stale_exports)) {
1780 exp = list_first_entry(&obd_stale_exports,
1781 struct obd_export, exp_stale_list);
1782 list_del_init(&exp->exp_stale_list);
1784 spin_unlock(&obd_stale_export_lock);
1787 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1788 atomic_read(&obd_stale_export_num));
1792 EXPORT_SYMBOL(obd_stale_export_get);
1794 void obd_stale_export_put(struct obd_export *exp)
1798 LASSERT(list_empty(&exp->exp_stale_list));
1799 if (exp->exp_lock_hash &&
1800 atomic_read(&exp->exp_lock_hash->hs_count)) {
1801 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1802 atomic_read(&obd_stale_export_num));
1804 spin_lock_bh(&exp->exp_bl_list_lock);
1805 spin_lock(&obd_stale_export_lock);
1806 /* Add to the tail if there is no blocked locks,
1807 * to the head otherwise.
1809 if (list_empty(&exp->exp_bl_list))
1810 list_add_tail(&exp->exp_stale_list,
1811 &obd_stale_exports);
1813 list_add(&exp->exp_stale_list,
1814 &obd_stale_exports);
1816 spin_unlock(&obd_stale_export_lock);
1817 spin_unlock_bh(&exp->exp_bl_list_lock);
1819 class_export_put(exp);
1823 EXPORT_SYMBOL(obd_stale_export_put);
1826 * Adjust the position of the export in the stale list,
1827 * i.e. move to the head of the list if is needed.
1829 void obd_stale_export_adjust(struct obd_export *exp)
1831 LASSERT(exp != NULL);
1832 spin_lock_bh(&exp->exp_bl_list_lock);
1833 spin_lock(&obd_stale_export_lock);
1835 if (!list_empty(&exp->exp_stale_list) &&
1836 !list_empty(&exp->exp_bl_list))
1837 list_move(&exp->exp_stale_list, &obd_stale_exports);
1839 spin_unlock(&obd_stale_export_lock);
1840 spin_unlock_bh(&exp->exp_bl_list_lock);
1842 EXPORT_SYMBOL(obd_stale_export_adjust);
1844 /* start destroy zombie import/export thread */
1845 int obd_zombie_impexp_init(void)
1847 zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1849 cfs_cpt_number(cfs_cpt_tab));
1851 return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1854 /* stop destroy zombie import/export thread */
1855 void obd_zombie_impexp_stop(void)
1857 destroy_workqueue(zombie_wq);
1858 LASSERT(list_empty(&obd_stale_exports));
1861 /***** Kernel-userspace comm helpers *******/
1863 /* Get length of entire message, including header */
1864 int kuc_len(int payload_len)
1866 return sizeof(struct kuc_hdr) + payload_len;
1868 EXPORT_SYMBOL(kuc_len);
1870 /* Get a pointer to kuc header, given a ptr to the payload
1871 * @param p Pointer to payload area
1872 * @returns Pointer to kuc header
1874 struct kuc_hdr *kuc_ptr(void *p)
1876 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1878 LASSERT(lh->kuc_magic == KUC_MAGIC);
1881 EXPORT_SYMBOL(kuc_ptr);
1883 /* Alloc space for a message, and fill in header
1884 * @return Pointer to payload area
1886 void *kuc_alloc(int payload_len, int transport, int type)
1889 int len = kuc_len(payload_len);
1893 return ERR_PTR(-ENOMEM);
1895 lh->kuc_magic = KUC_MAGIC;
1896 lh->kuc_transport = transport;
1897 lh->kuc_msgtype = type;
1898 lh->kuc_msglen = len;
1900 return (void *)(lh + 1);
1902 EXPORT_SYMBOL(kuc_alloc);
1904 /* Takes pointer to payload area */
1905 void kuc_free(void *p, int payload_len)
1907 struct kuc_hdr *lh = kuc_ptr(p);
1909 OBD_FREE(lh, kuc_len(payload_len));
1911 EXPORT_SYMBOL(kuc_free);
1913 struct obd_request_slot_waiter {
1914 struct list_head orsw_entry;
1915 wait_queue_head_t orsw_waitq;
1919 static bool obd_request_slot_avail(struct client_obd *cli,
1920 struct obd_request_slot_waiter *orsw)
1924 spin_lock(&cli->cl_loi_list_lock);
1925 avail = !!list_empty(&orsw->orsw_entry);
1926 spin_unlock(&cli->cl_loi_list_lock);
1932 * For network flow control, the RPC sponsor needs to acquire a credit
1933 * before sending the RPC. The credits count for a connection is defined
1934 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1935 * the subsequent RPC sponsors need to wait until others released their
1936 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1938 int obd_get_request_slot(struct client_obd *cli)
1940 struct obd_request_slot_waiter orsw;
1943 spin_lock(&cli->cl_loi_list_lock);
1944 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1945 cli->cl_rpcs_in_flight++;
1946 spin_unlock(&cli->cl_loi_list_lock);
1950 init_waitqueue_head(&orsw.orsw_waitq);
1951 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
1952 orsw.orsw_signaled = false;
1953 spin_unlock(&cli->cl_loi_list_lock);
1955 rc = l_wait_event_abortable(orsw.orsw_waitq,
1956 obd_request_slot_avail(cli, &orsw) ||
1957 orsw.orsw_signaled);
1959 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1960 * freed but other (such as obd_put_request_slot) is using it.
1962 spin_lock(&cli->cl_loi_list_lock);
1964 if (!orsw.orsw_signaled) {
1965 if (list_empty(&orsw.orsw_entry))
1966 cli->cl_rpcs_in_flight--;
1968 list_del(&orsw.orsw_entry);
1973 if (orsw.orsw_signaled) {
1974 LASSERT(list_empty(&orsw.orsw_entry));
1978 spin_unlock(&cli->cl_loi_list_lock);
1982 EXPORT_SYMBOL(obd_get_request_slot);
1984 void obd_put_request_slot(struct client_obd *cli)
1986 struct obd_request_slot_waiter *orsw;
1988 spin_lock(&cli->cl_loi_list_lock);
1989 cli->cl_rpcs_in_flight--;
1991 /* If there is free slot, wakeup the first waiter. */
1992 if (!list_empty(&cli->cl_flight_waiters) &&
1993 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
1994 orsw = list_first_entry(&cli->cl_flight_waiters,
1995 struct obd_request_slot_waiter,
1997 list_del_init(&orsw->orsw_entry);
1998 cli->cl_rpcs_in_flight++;
1999 wake_up(&orsw->orsw_waitq);
2001 spin_unlock(&cli->cl_loi_list_lock);
2003 EXPORT_SYMBOL(obd_put_request_slot);
2005 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2007 return cli->cl_max_rpcs_in_flight;
2009 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2011 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2013 struct obd_request_slot_waiter *orsw;
2019 if (max > OBD_MAX_RIF_MAX || max < 1)
2022 CDEBUG(D_INFO, "%s: max = %u max_mod = %u rif = %u\n",
2023 cli->cl_import->imp_obd->obd_name, max,
2024 cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2026 if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2027 LUSTRE_MDC_NAME) == 0) {
2028 /* adjust max_mod_rpcs_in_flight to ensure it is always
2029 * strictly lower that max_rpcs_in_flight
2032 CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2033 cli->cl_import->imp_obd->obd_name);
2036 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2037 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2043 spin_lock(&cli->cl_loi_list_lock);
2044 old = cli->cl_max_rpcs_in_flight;
2045 cli->cl_max_rpcs_in_flight = max;
2046 client_adjust_max_dirty(cli);
2050 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2051 for (i = 0; i < diff; i++) {
2052 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2053 struct obd_request_slot_waiter,
2058 list_del_init(&orsw->orsw_entry);
2059 cli->cl_rpcs_in_flight++;
2060 wake_up(&orsw->orsw_waitq);
2062 spin_unlock(&cli->cl_loi_list_lock);
2066 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2068 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2070 return cli->cl_max_mod_rpcs_in_flight;
2072 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2074 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2076 struct obd_connect_data *ocd;
2080 if (max > OBD_MAX_RIF_MAX || max < 1)
2083 ocd = &cli->cl_import->imp_connect_data;
2084 CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2085 cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2086 ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2088 if (max == OBD_MAX_RIF_MAX)
2089 max = OBD_MAX_RIF_MAX - 1;
2091 /* Cannot exceed or equal max_rpcs_in_flight. If we are asked to
2092 * increase this value, also bump up max_rpcs_in_flight to match.
2094 if (max >= cli->cl_max_rpcs_in_flight) {
2096 "%s: increasing max_rpcs_in_flight=%u to allow larger max_mod_rpcs_in_flight=%u\n",
2097 cli->cl_import->imp_obd->obd_name, max + 1, max);
2098 obd_set_max_rpcs_in_flight(cli, max + 1);
2101 /* cannot exceed max modify RPCs in flight supported by the server,
2102 * but verify ocd_connect_flags is at least initialized first. If
2103 * not, allow it and fix value later in ptlrpc_connect_set_flags().
2105 if (!ocd->ocd_connect_flags) {
2106 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2107 } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2108 maxmodrpcs = ocd->ocd_maxmodrpcs;
2109 if (maxmodrpcs == 0) { /* connection not finished yet */
2110 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2112 "%s: partial connect, assume maxmodrpcs=%hu\n",
2113 cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2118 if (max > maxmodrpcs) {
2119 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than mdt.*.max_mod_rpcs_in_flight=%hu returned by the MDT server at connection.\n",
2120 cli->cl_import->imp_obd->obd_name,
2125 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2127 prev = cli->cl_max_mod_rpcs_in_flight;
2128 cli->cl_max_mod_rpcs_in_flight = max;
2130 /* wakeup waiters if limit has been increased */
2131 if (cli->cl_max_mod_rpcs_in_flight > prev)
2132 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2134 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2138 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2140 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2141 struct seq_file *seq)
2143 unsigned long mod_tot = 0, mod_cum;
2146 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2147 lprocfs_stats_header(seq, ktime_get_real(), cli->cl_mod_rpcs_init, 25,
2149 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2150 cli->cl_mod_rpcs_in_flight);
2152 seq_puts(seq, "\n\t\t\tmodify\n");
2153 seq_puts(seq, "rpcs in flight rpcs %% cum %%\n");
2155 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2158 for (i = 0; i < OBD_HIST_MAX; i++) {
2159 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2162 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2163 i, mod, pct(mod, mod_tot),
2164 pct(mod_cum, mod_tot));
2165 if (mod_cum == mod_tot)
2169 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2173 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2175 /* The number of modify RPCs sent in parallel is limited
2176 * because the server has a finite number of slots per client to
2177 * store request result and ensure reply reconstruction when needed.
2178 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2179 * that takes into account server limit and cl_max_rpcs_in_flight
2181 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2182 * one close request is allowed above the maximum.
2185 struct client_obd *cli;
2188 wait_queue_entry_t wqe;
2190 static int claim_mod_rpc_function(wait_queue_entry_t *wq_entry,
2191 unsigned int mode, int flags, void *key)
2193 struct mod_waiter *w = container_of(wq_entry, struct mod_waiter, wqe);
2194 struct client_obd *cli = w->cli;
2195 bool close_req = w->close_req;
2199 /* As woken_wake_function() doesn't remove us from the wait_queue,
2200 * we use own flag to ensure we're called just once.
2205 /* A slot is available if
2206 * - number of modify RPCs in flight is less than the max
2207 * - it's a close RPC and no other close request is in flight
2209 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2210 (close_req && cli->cl_close_rpcs_in_flight == 0);
2212 cli->cl_mod_rpcs_in_flight++;
2214 cli->cl_close_rpcs_in_flight++;
2215 ret = woken_wake_function(wq_entry, mode, flags, key);
2217 } else if (cli->cl_close_rpcs_in_flight)
2218 /* No other waiter could be woken */
2221 /* This was not a wakeup from a close completion or a new close
2222 * being queued, so there is no point seeing if there are close
2223 * waiters to be woken.
2227 /* There might be be a close we could wake, keep looking */
2232 /* Get a modify RPC slot from the obd client @cli according
2233 * to the kind of operation @opc that is going to be sent
2234 * and the intent @it of the operation if it applies.
2235 * If the maximum number of modify RPCs in flight is reached
2236 * the thread is put to sleep.
2237 * Returns the tag to be set in the request message. Tag 0
2238 * is reserved for non-modifying requests.
2240 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2242 struct mod_waiter wait = {
2244 .close_req = (opc == MDS_CLOSE),
2249 init_wait(&wait.wqe);
2250 wait.wqe.func = claim_mod_rpc_function;
2252 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2253 __add_wait_queue_entry_tail(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2254 /* This wakeup will only succeed if the maximums haven't
2255 * been reached. If that happens, wait.woken will be set
2256 * and there will be no need to wait.
2257 * If a close_req was enqueue, ensure we search all the way to the
2258 * end of the waitqueue for a close request.
2260 __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2261 (void*)wait.close_req);
2263 while (wait.woken == false) {
2264 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2265 wait_woken(&wait.wqe, TASK_UNINTERRUPTIBLE,
2266 MAX_SCHEDULE_TIMEOUT);
2267 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2269 __remove_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2271 max = cli->cl_max_mod_rpcs_in_flight;
2272 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2273 cli->cl_mod_rpcs_in_flight);
2274 /* find a free tag */
2275 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2277 LASSERT(i < OBD_MAX_RIF_MAX);
2278 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2279 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2280 /* tag 0 is reserved for non-modify RPCs */
2283 "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2284 cli->cl_import->imp_obd->obd_name,
2289 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2291 /* Put a modify RPC slot from the obd client @cli according
2292 * to the kind of operation @opc that has been sent.
2294 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2296 bool close_req = false;
2301 if (opc == MDS_CLOSE)
2304 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2305 cli->cl_mod_rpcs_in_flight--;
2307 cli->cl_close_rpcs_in_flight--;
2308 /* release the tag in the bitmap */
2309 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2310 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2311 __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2313 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2315 EXPORT_SYMBOL(obd_put_mod_rpc_slot);