4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59 const char *status, int locks, int debug_level);
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90 obd, obd->obd_namespace, obd->obd_force);
93 lu_ref_fini(&obd->obd_reference);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 struct obd_type *class_search_type(const char *name)
99 struct kobject *kobj = kset_find_obj(lustre_kset, name);
101 if (kobj && kobj->ktype == &class_ktype)
102 return container_of(kobj, struct obd_type, typ_kobj);
107 EXPORT_SYMBOL(class_search_type);
109 struct obd_type *class_get_type(const char *name)
111 struct obd_type *type;
113 type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
116 const char *modname = name;
118 #ifdef HAVE_SERVER_SUPPORT
119 if (strcmp(modname, "obdfilter") == 0)
122 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123 modname = LUSTRE_OSP_NAME;
125 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126 modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 if (try_module_get(type->typ_dt_ops->o_owner)) {
140 atomic_inc(&type->typ_refcnt);
141 /* class_search_type() returned a counted reference,
142 * but we don't need that count any more as
143 * we have one through typ_refcnt.
145 kobject_put(&type->typ_kobj);
147 kobject_put(&type->typ_kobj);
154 void class_put_type(struct obd_type *type)
157 module_put(type->typ_dt_ops->o_owner);
158 atomic_dec(&type->typ_refcnt);
161 static void class_sysfs_release(struct kobject *kobj)
163 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
165 debugfs_remove_recursive(type->typ_debugfs_entry);
166 type->typ_debugfs_entry = NULL;
169 lu_device_type_fini(type->typ_lu);
171 #ifdef CONFIG_PROC_FS
172 if (type->typ_name && type->typ_procroot)
173 remove_proc_subtree(type->typ_name, proc_lustre_root);
175 OBD_FREE(type, sizeof(*type));
178 static struct kobj_type class_ktype = {
179 .sysfs_ops = &lustre_sysfs_ops,
180 .release = class_sysfs_release,
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
186 struct dentry *symlink;
187 struct obd_type *type;
190 type = class_search_type(name);
192 kobject_put(&type->typ_kobj);
193 return ERR_PTR(-EEXIST);
196 OBD_ALLOC(type, sizeof(*type));
198 return ERR_PTR(-ENOMEM);
200 type->typ_kobj.kset = lustre_kset;
201 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202 &lustre_kset->kobj, "%s", name);
206 symlink = debugfs_create_dir(name, debugfs_lustre_root);
207 if (IS_ERR_OR_NULL(symlink)) {
208 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
209 kobject_put(&type->typ_kobj);
212 type->typ_debugfs_entry = symlink;
213 type->typ_sym_filter = true;
216 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
218 if (IS_ERR(type->typ_procroot)) {
219 CERROR("%s: can't create compat proc entry: %d\n",
220 name, (int)PTR_ERR(type->typ_procroot));
221 type->typ_procroot = NULL;
227 EXPORT_SYMBOL(class_add_symlinks);
228 #endif /* HAVE_SERVER_SUPPORT */
230 #define CLASS_MAX_NAME 1024
232 int class_register_type(const struct obd_ops *dt_ops,
233 const struct md_ops *md_ops,
234 bool enable_proc, struct lprocfs_vars *vars,
235 const char *name, struct lu_device_type *ldt)
237 struct obd_type *type;
242 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
244 type = class_search_type(name);
246 #ifdef HAVE_SERVER_SUPPORT
247 if (type->typ_sym_filter)
249 #endif /* HAVE_SERVER_SUPPORT */
250 kobject_put(&type->typ_kobj);
251 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
255 OBD_ALLOC(type, sizeof(*type));
259 type->typ_kobj.kset = lustre_kset;
260 kobject_init(&type->typ_kobj, &class_ktype);
261 #ifdef HAVE_SERVER_SUPPORT
263 #endif /* HAVE_SERVER_SUPPORT */
265 type->typ_dt_ops = dt_ops;
266 type->typ_md_ops = md_ops;
268 #ifdef HAVE_SERVER_SUPPORT
269 if (type->typ_sym_filter) {
270 type->typ_sym_filter = false;
271 kobject_put(&type->typ_kobj);
275 #ifdef CONFIG_PROC_FS
276 if (enable_proc && !type->typ_procroot) {
277 type->typ_procroot = lprocfs_register(name,
280 if (IS_ERR(type->typ_procroot)) {
281 rc = PTR_ERR(type->typ_procroot);
282 type->typ_procroot = NULL;
287 type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
289 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
290 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
292 type->typ_debugfs_entry = NULL;
296 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
299 #ifdef HAVE_SERVER_SUPPORT
304 rc = lu_device_type_init(ldt);
312 kobject_put(&type->typ_kobj);
316 EXPORT_SYMBOL(class_register_type);
318 int class_unregister_type(const char *name)
320 struct obd_type *type = class_search_type(name);
325 CERROR("unknown obd type\n");
329 if (atomic_read(&type->typ_refcnt)) {
330 CERROR("type %s has refcount (%d)\n", name,
331 atomic_read(&type->typ_refcnt));
332 /* This is a bad situation, let's make the best of it */
333 /* Remove ops, but leave the name for debugging */
334 type->typ_dt_ops = NULL;
335 type->typ_md_ops = NULL;
336 GOTO(out_put, rc = -EBUSY);
339 /* Put the final ref */
340 kobject_put(&type->typ_kobj);
342 /* Put the ref returned by class_search_type() */
343 kobject_put(&type->typ_kobj);
346 } /* class_unregister_type */
347 EXPORT_SYMBOL(class_unregister_type);
350 * Create a new obd device.
352 * Allocate the new obd_device and initialize it.
354 * \param[in] type_name obd device type string.
355 * \param[in] name obd device name.
356 * \param[in] uuid obd device UUID
358 * \retval newdev pointer to created obd_device
359 * \retval ERR_PTR(errno) on error
361 struct obd_device *class_newdev(const char *type_name, const char *name,
364 struct obd_device *newdev;
365 struct obd_type *type = NULL;
368 if (strlen(name) >= MAX_OBD_NAME) {
369 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
370 RETURN(ERR_PTR(-EINVAL));
373 type = class_get_type(type_name);
375 CERROR("OBD: unknown type: %s\n", type_name);
376 RETURN(ERR_PTR(-ENODEV));
379 newdev = obd_device_alloc();
380 if (newdev == NULL) {
381 class_put_type(type);
382 RETURN(ERR_PTR(-ENOMEM));
384 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
385 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
386 newdev->obd_type = type;
387 newdev->obd_minor = -1;
389 rwlock_init(&newdev->obd_pool_lock);
390 newdev->obd_pool_limit = 0;
391 newdev->obd_pool_slv = 0;
393 INIT_LIST_HEAD(&newdev->obd_exports);
394 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
395 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
396 INIT_LIST_HEAD(&newdev->obd_exports_timed);
397 INIT_LIST_HEAD(&newdev->obd_nid_stats);
398 spin_lock_init(&newdev->obd_nid_lock);
399 spin_lock_init(&newdev->obd_dev_lock);
400 mutex_init(&newdev->obd_dev_mutex);
401 spin_lock_init(&newdev->obd_osfs_lock);
402 /* newdev->obd_osfs_age must be set to a value in the distant
403 * past to guarantee a fresh statfs is fetched on mount. */
404 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
406 /* XXX belongs in setup not attach */
407 init_rwsem(&newdev->obd_observer_link_sem);
409 spin_lock_init(&newdev->obd_recovery_task_lock);
410 init_waitqueue_head(&newdev->obd_next_transno_waitq);
411 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
412 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
413 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
414 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
415 INIT_LIST_HEAD(&newdev->obd_evict_list);
416 INIT_LIST_HEAD(&newdev->obd_lwp_list);
418 llog_group_init(&newdev->obd_olg);
419 /* Detach drops this */
420 atomic_set(&newdev->obd_refcount, 1);
421 lu_ref_init(&newdev->obd_reference);
422 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
424 newdev->obd_conn_inprogress = 0;
426 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
428 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
429 newdev->obd_name, newdev);
437 * \param[in] obd obd_device to be freed
441 void class_free_dev(struct obd_device *obd)
443 struct obd_type *obd_type = obd->obd_type;
445 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
446 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
448 "obd %p != obd_devs[%d] %p\n",
449 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
450 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
451 "obd_refcount should be 0, not %d\n",
452 atomic_read(&obd->obd_refcount));
453 LASSERT(obd_type != NULL);
455 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
456 obd->obd_name, obd->obd_type->typ_name);
458 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
459 obd->obd_name, obd->obd_uuid.uuid);
460 if (obd->obd_stopping) {
463 /* If we're not stopping, we were never set up */
464 err = obd_cleanup(obd);
466 CERROR("Cleanup %s returned %d\n",
470 obd_device_free(obd);
472 class_put_type(obd_type);
476 * Unregister obd device.
478 * Free slot in obd_dev[] used by \a obd.
480 * \param[in] new_obd obd_device to be unregistered
484 void class_unregister_device(struct obd_device *obd)
486 write_lock(&obd_dev_lock);
487 if (obd->obd_minor >= 0) {
488 LASSERT(obd_devs[obd->obd_minor] == obd);
489 obd_devs[obd->obd_minor] = NULL;
492 write_unlock(&obd_dev_lock);
496 * Register obd device.
498 * Find free slot in obd_devs[], fills it with \a new_obd.
500 * \param[in] new_obd obd_device to be registered
503 * \retval -EEXIST device with this name is registered
504 * \retval -EOVERFLOW obd_devs[] is full
506 int class_register_device(struct obd_device *new_obd)
510 int new_obd_minor = 0;
511 bool minor_assign = false;
512 bool retried = false;
515 write_lock(&obd_dev_lock);
516 for (i = 0; i < class_devno_max(); i++) {
517 struct obd_device *obd = class_num2obd(i);
520 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
523 write_unlock(&obd_dev_lock);
525 /* the obd_device could be waited to be
526 * destroyed by the "obd_zombie_impexp_thread".
528 obd_zombie_barrier();
533 CERROR("%s: already exists, won't add\n",
535 /* in case we found a free slot before duplicate */
536 minor_assign = false;
540 if (!minor_assign && obd == NULL) {
547 new_obd->obd_minor = new_obd_minor;
548 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
549 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
550 obd_devs[new_obd_minor] = new_obd;
554 CERROR("%s: all %u/%u devices used, increase "
555 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
556 i, class_devno_max(), ret);
559 write_unlock(&obd_dev_lock);
564 static int class_name2dev_nolock(const char *name)
571 for (i = 0; i < class_devno_max(); i++) {
572 struct obd_device *obd = class_num2obd(i);
574 if (obd && strcmp(name, obd->obd_name) == 0) {
575 /* Make sure we finished attaching before we give
576 out any references */
577 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
578 if (obd->obd_attached) {
588 int class_name2dev(const char *name)
595 read_lock(&obd_dev_lock);
596 i = class_name2dev_nolock(name);
597 read_unlock(&obd_dev_lock);
601 EXPORT_SYMBOL(class_name2dev);
603 struct obd_device *class_name2obd(const char *name)
605 int dev = class_name2dev(name);
607 if (dev < 0 || dev > class_devno_max())
609 return class_num2obd(dev);
611 EXPORT_SYMBOL(class_name2obd);
613 int class_uuid2dev_nolock(struct obd_uuid *uuid)
617 for (i = 0; i < class_devno_max(); i++) {
618 struct obd_device *obd = class_num2obd(i);
620 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
621 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
629 int class_uuid2dev(struct obd_uuid *uuid)
633 read_lock(&obd_dev_lock);
634 i = class_uuid2dev_nolock(uuid);
635 read_unlock(&obd_dev_lock);
639 EXPORT_SYMBOL(class_uuid2dev);
641 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
643 int dev = class_uuid2dev(uuid);
646 return class_num2obd(dev);
648 EXPORT_SYMBOL(class_uuid2obd);
651 * Get obd device from ::obd_devs[]
653 * \param num [in] array index
655 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
656 * otherwise return the obd device there.
658 struct obd_device *class_num2obd(int num)
660 struct obd_device *obd = NULL;
662 if (num < class_devno_max()) {
667 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
668 "%p obd_magic %08x != %08x\n",
669 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
670 LASSERTF(obd->obd_minor == num,
671 "%p obd_minor %0d != %0d\n",
672 obd, obd->obd_minor, num);
679 * Find obd in obd_dev[] by name or uuid.
681 * Increment obd's refcount if found.
683 * \param[in] str obd name or uuid
685 * \retval NULL if not found
686 * \retval target pointer to found obd_device
688 struct obd_device *class_dev_by_str(const char *str)
690 struct obd_device *target = NULL;
691 struct obd_uuid tgtuuid;
694 obd_str2uuid(&tgtuuid, str);
696 read_lock(&obd_dev_lock);
697 rc = class_uuid2dev_nolock(&tgtuuid);
699 rc = class_name2dev_nolock(str);
702 target = class_num2obd(rc);
705 class_incref(target, "find", current);
706 read_unlock(&obd_dev_lock);
710 EXPORT_SYMBOL(class_dev_by_str);
713 * Get obd devices count. Device in any
715 * \retval obd device count
717 int get_devices_count(void)
719 int index, max_index = class_devno_max(), dev_count = 0;
721 read_lock(&obd_dev_lock);
722 for (index = 0; index <= max_index; index++) {
723 struct obd_device *obd = class_num2obd(index);
727 read_unlock(&obd_dev_lock);
731 EXPORT_SYMBOL(get_devices_count);
733 void class_obd_list(void)
738 read_lock(&obd_dev_lock);
739 for (i = 0; i < class_devno_max(); i++) {
740 struct obd_device *obd = class_num2obd(i);
744 if (obd->obd_stopping)
746 else if (obd->obd_set_up)
748 else if (obd->obd_attached)
752 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
753 i, status, obd->obd_type->typ_name,
754 obd->obd_name, obd->obd_uuid.uuid,
755 atomic_read(&obd->obd_refcount));
757 read_unlock(&obd_dev_lock);
760 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
761 specified, then only the client with that uuid is returned,
762 otherwise any client connected to the tgt is returned. */
763 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
764 const char *type_name,
765 struct obd_uuid *grp_uuid)
769 read_lock(&obd_dev_lock);
770 for (i = 0; i < class_devno_max(); i++) {
771 struct obd_device *obd = class_num2obd(i);
775 if ((strncmp(obd->obd_type->typ_name, type_name,
776 strlen(type_name)) == 0)) {
777 if (obd_uuid_equals(tgt_uuid,
778 &obd->u.cli.cl_target_uuid) &&
779 ((grp_uuid)? obd_uuid_equals(grp_uuid,
780 &obd->obd_uuid) : 1)) {
781 read_unlock(&obd_dev_lock);
786 read_unlock(&obd_dev_lock);
790 EXPORT_SYMBOL(class_find_client_obd);
792 /* Iterate the obd_device list looking devices have grp_uuid. Start
793 searching at *next, and if a device is found, the next index to look
794 at is saved in *next. If next is NULL, then the first matching device
795 will always be returned. */
796 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
802 else if (*next >= 0 && *next < class_devno_max())
807 read_lock(&obd_dev_lock);
808 for (; i < class_devno_max(); i++) {
809 struct obd_device *obd = class_num2obd(i);
813 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
816 read_unlock(&obd_dev_lock);
820 read_unlock(&obd_dev_lock);
824 EXPORT_SYMBOL(class_devices_in_group);
827 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
828 * adjust sptlrpc settings accordingly.
830 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
832 struct obd_device *obd;
836 LASSERT(namelen > 0);
838 read_lock(&obd_dev_lock);
839 for (i = 0; i < class_devno_max(); i++) {
840 obd = class_num2obd(i);
842 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
845 /* only notify mdc, osc, osp, lwp, mdt, ost
846 * because only these have a -sptlrpc llog */
847 type = obd->obd_type->typ_name;
848 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
849 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
850 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
851 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
852 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
853 strcmp(type, LUSTRE_OST_NAME) != 0)
856 if (strncmp(obd->obd_name, fsname, namelen))
859 class_incref(obd, __FUNCTION__, obd);
860 read_unlock(&obd_dev_lock);
861 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
862 sizeof(KEY_SPTLRPC_CONF),
863 KEY_SPTLRPC_CONF, 0, NULL, NULL);
865 class_decref(obd, __FUNCTION__, obd);
866 read_lock(&obd_dev_lock);
868 read_unlock(&obd_dev_lock);
871 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
873 void obd_cleanup_caches(void)
876 if (obd_device_cachep) {
877 kmem_cache_destroy(obd_device_cachep);
878 obd_device_cachep = NULL;
884 int obd_init_caches(void)
889 LASSERT(obd_device_cachep == NULL);
890 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
891 sizeof(struct obd_device),
892 0, 0, 0, sizeof(struct obd_device), NULL);
893 if (!obd_device_cachep)
894 GOTO(out, rc = -ENOMEM);
898 obd_cleanup_caches();
902 static const char export_handle_owner[] = "export";
904 /* map connection to client */
905 struct obd_export *class_conn2export(struct lustre_handle *conn)
907 struct obd_export *export;
911 CDEBUG(D_CACHE, "looking for null handle\n");
915 if (conn->cookie == -1) { /* this means assign a new connection */
916 CDEBUG(D_CACHE, "want a new connection\n");
920 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
921 export = class_handle2object(conn->cookie, export_handle_owner);
924 EXPORT_SYMBOL(class_conn2export);
926 struct obd_device *class_exp2obd(struct obd_export *exp)
932 EXPORT_SYMBOL(class_exp2obd);
934 struct obd_import *class_exp2cliimp(struct obd_export *exp)
936 struct obd_device *obd = exp->exp_obd;
939 return obd->u.cli.cl_import;
941 EXPORT_SYMBOL(class_exp2cliimp);
943 /* Export management functions */
944 static void class_export_destroy(struct obd_export *exp)
946 struct obd_device *obd = exp->exp_obd;
949 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
950 LASSERT(obd != NULL);
952 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
953 exp->exp_client_uuid.uuid, obd->obd_name);
955 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
956 if (exp->exp_connection)
957 ptlrpc_put_connection_superhack(exp->exp_connection);
959 LASSERT(list_empty(&exp->exp_outstanding_replies));
960 LASSERT(list_empty(&exp->exp_uncommitted_replies));
961 LASSERT(list_empty(&exp->exp_req_replay_queue));
962 LASSERT(list_empty(&exp->exp_hp_rpcs));
963 obd_destroy_export(exp);
964 /* self export doesn't hold a reference to an obd, although it
965 * exists until freeing of the obd */
966 if (exp != obd->obd_self_export)
967 class_decref(obd, "export", exp);
969 OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
970 kfree_rcu(exp, exp_handle.h_rcu);
974 struct obd_export *class_export_get(struct obd_export *exp)
976 refcount_inc(&exp->exp_handle.h_ref);
977 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
978 refcount_read(&exp->exp_handle.h_ref));
981 EXPORT_SYMBOL(class_export_get);
983 void class_export_put(struct obd_export *exp)
985 LASSERT(exp != NULL);
986 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
987 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
988 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
989 refcount_read(&exp->exp_handle.h_ref) - 1);
991 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
992 struct obd_device *obd = exp->exp_obd;
994 CDEBUG(D_IOCTL, "final put %p/%s\n",
995 exp, exp->exp_client_uuid.uuid);
997 /* release nid stat refererence */
998 lprocfs_exp_cleanup(exp);
1000 if (exp == obd->obd_self_export) {
1001 /* self export should be destroyed without
1002 * zombie thread as it doesn't hold a
1003 * reference to obd and doesn't hold any
1005 class_export_destroy(exp);
1006 /* self export is destroyed, no class
1007 * references exist and it is safe to free
1009 class_free_dev(obd);
1011 LASSERT(!list_empty(&exp->exp_obd_chain));
1012 obd_zombie_export_add(exp);
1017 EXPORT_SYMBOL(class_export_put);
1019 static void obd_zombie_exp_cull(struct work_struct *ws)
1021 struct obd_export *export;
1023 export = container_of(ws, struct obd_export, exp_zombie_work);
1024 class_export_destroy(export);
1027 /* Creates a new export, adds it to the hash table, and returns a
1028 * pointer to it. The refcount is 2: one for the hash reference, and
1029 * one for the pointer returned by this function. */
1030 struct obd_export *__class_new_export(struct obd_device *obd,
1031 struct obd_uuid *cluuid, bool is_self)
1033 struct obd_export *export;
1037 OBD_ALLOC_PTR(export);
1039 return ERR_PTR(-ENOMEM);
1041 export->exp_conn_cnt = 0;
1042 export->exp_lock_hash = NULL;
1043 export->exp_flock_hash = NULL;
1044 /* 2 = class_handle_hash + last */
1045 refcount_set(&export->exp_handle.h_ref, 2);
1046 atomic_set(&export->exp_rpc_count, 0);
1047 atomic_set(&export->exp_cb_count, 0);
1048 atomic_set(&export->exp_locks_count, 0);
1049 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1050 INIT_LIST_HEAD(&export->exp_locks_list);
1051 spin_lock_init(&export->exp_locks_list_guard);
1053 atomic_set(&export->exp_replay_count, 0);
1054 export->exp_obd = obd;
1055 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1056 spin_lock_init(&export->exp_uncommitted_replies_lock);
1057 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1058 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1059 INIT_HLIST_NODE(&export->exp_handle.h_link);
1060 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1061 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1062 class_handle_hash(&export->exp_handle, export_handle_owner);
1063 export->exp_last_request_time = ktime_get_real_seconds();
1064 spin_lock_init(&export->exp_lock);
1065 spin_lock_init(&export->exp_rpc_lock);
1066 INIT_HLIST_NODE(&export->exp_nid_hash);
1067 INIT_HLIST_NODE(&export->exp_gen_hash);
1068 spin_lock_init(&export->exp_bl_list_lock);
1069 INIT_LIST_HEAD(&export->exp_bl_list);
1070 INIT_LIST_HEAD(&export->exp_stale_list);
1071 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1073 export->exp_sp_peer = LUSTRE_SP_ANY;
1074 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1075 export->exp_client_uuid = *cluuid;
1076 obd_init_export(export);
1078 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1080 spin_lock(&obd->obd_dev_lock);
1081 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1082 /* shouldn't happen, but might race */
1083 if (obd->obd_stopping)
1084 GOTO(exit_unlock, rc = -ENODEV);
1086 rc = obd_uuid_add(obd, export);
1088 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1089 obd->obd_name, cluuid->uuid, rc);
1090 GOTO(exit_unlock, rc = -EALREADY);
1095 class_incref(obd, "export", export);
1096 list_add_tail(&export->exp_obd_chain_timed,
1097 &obd->obd_exports_timed);
1098 list_add(&export->exp_obd_chain, &obd->obd_exports);
1099 obd->obd_num_exports++;
1101 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1102 INIT_LIST_HEAD(&export->exp_obd_chain);
1104 spin_unlock(&obd->obd_dev_lock);
1108 spin_unlock(&obd->obd_dev_lock);
1109 class_handle_unhash(&export->exp_handle);
1110 obd_destroy_export(export);
1111 OBD_FREE_PTR(export);
1115 struct obd_export *class_new_export(struct obd_device *obd,
1116 struct obd_uuid *uuid)
1118 return __class_new_export(obd, uuid, false);
1120 EXPORT_SYMBOL(class_new_export);
1122 struct obd_export *class_new_export_self(struct obd_device *obd,
1123 struct obd_uuid *uuid)
1125 return __class_new_export(obd, uuid, true);
1128 void class_unlink_export(struct obd_export *exp)
1130 class_handle_unhash(&exp->exp_handle);
1132 if (exp->exp_obd->obd_self_export == exp) {
1133 class_export_put(exp);
1137 spin_lock(&exp->exp_obd->obd_dev_lock);
1138 /* delete an uuid-export hashitem from hashtables */
1139 if (exp != exp->exp_obd->obd_self_export)
1140 obd_uuid_del(exp->exp_obd, exp);
1142 #ifdef HAVE_SERVER_SUPPORT
1143 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1144 struct tg_export_data *ted = &exp->exp_target_data;
1145 struct cfs_hash *hash;
1147 /* Because obd_gen_hash will not be released until
1148 * class_cleanup(), so hash should never be NULL here */
1149 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1150 LASSERT(hash != NULL);
1151 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1152 &exp->exp_gen_hash);
1153 cfs_hash_putref(hash);
1155 #endif /* HAVE_SERVER_SUPPORT */
1157 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1158 list_del_init(&exp->exp_obd_chain_timed);
1159 exp->exp_obd->obd_num_exports--;
1160 spin_unlock(&exp->exp_obd->obd_dev_lock);
1161 atomic_inc(&obd_stale_export_num);
1163 /* A reference is kept by obd_stale_exports list */
1164 obd_stale_export_put(exp);
1166 EXPORT_SYMBOL(class_unlink_export);
1168 /* Import management functions */
1169 static void obd_zombie_import_free(struct obd_import *imp)
1173 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1174 imp->imp_obd->obd_name);
1176 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1178 ptlrpc_put_connection_superhack(imp->imp_connection);
1180 while (!list_empty(&imp->imp_conn_list)) {
1181 struct obd_import_conn *imp_conn;
1183 imp_conn = list_entry(imp->imp_conn_list.next,
1184 struct obd_import_conn, oic_item);
1185 list_del_init(&imp_conn->oic_item);
1186 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1187 OBD_FREE(imp_conn, sizeof(*imp_conn));
1190 LASSERT(imp->imp_sec == NULL);
1191 class_decref(imp->imp_obd, "import", imp);
1196 struct obd_import *class_import_get(struct obd_import *import)
1198 atomic_inc(&import->imp_refcount);
1199 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1200 atomic_read(&import->imp_refcount),
1201 import->imp_obd->obd_name);
1204 EXPORT_SYMBOL(class_import_get);
1206 void class_import_put(struct obd_import *imp)
1210 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1212 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1213 atomic_read(&imp->imp_refcount) - 1,
1214 imp->imp_obd->obd_name);
1216 if (atomic_dec_and_test(&imp->imp_refcount)) {
1217 CDEBUG(D_INFO, "final put import %p\n", imp);
1218 obd_zombie_import_add(imp);
1223 EXPORT_SYMBOL(class_import_put);
1225 static void init_imp_at(struct imp_at *at) {
1227 at_init(&at->iat_net_latency, 0, 0);
1228 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1229 /* max service estimates are tracked on the server side, so
1230 don't use the AT history here, just use the last reported
1231 val. (But keep hist for proc histogram, worst_ever) */
1232 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1237 static void obd_zombie_imp_cull(struct work_struct *ws)
1239 struct obd_import *import;
1241 import = container_of(ws, struct obd_import, imp_zombie_work);
1242 obd_zombie_import_free(import);
1245 struct obd_import *class_new_import(struct obd_device *obd)
1247 struct obd_import *imp;
1248 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1250 OBD_ALLOC(imp, sizeof(*imp));
1254 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1255 INIT_LIST_HEAD(&imp->imp_replay_list);
1256 INIT_LIST_HEAD(&imp->imp_sending_list);
1257 INIT_LIST_HEAD(&imp->imp_delayed_list);
1258 INIT_LIST_HEAD(&imp->imp_committed_list);
1259 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1260 imp->imp_known_replied_xid = 0;
1261 imp->imp_replay_cursor = &imp->imp_committed_list;
1262 spin_lock_init(&imp->imp_lock);
1263 imp->imp_last_success_conn = 0;
1264 imp->imp_state = LUSTRE_IMP_NEW;
1265 imp->imp_obd = class_incref(obd, "import", imp);
1266 rwlock_init(&imp->imp_sec_lock);
1267 init_waitqueue_head(&imp->imp_recovery_waitq);
1268 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1270 if (curr_pid_ns && curr_pid_ns->child_reaper)
1271 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1273 imp->imp_sec_refpid = 1;
1275 atomic_set(&imp->imp_refcount, 2);
1276 atomic_set(&imp->imp_unregistering, 0);
1277 atomic_set(&imp->imp_inflight, 0);
1278 atomic_set(&imp->imp_replay_inflight, 0);
1279 atomic_set(&imp->imp_inval_count, 0);
1280 INIT_LIST_HEAD(&imp->imp_conn_list);
1281 init_imp_at(&imp->imp_at);
1283 /* the default magic is V2, will be used in connect RPC, and
1284 * then adjusted according to the flags in request/reply. */
1285 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1289 EXPORT_SYMBOL(class_new_import);
1291 void class_destroy_import(struct obd_import *import)
1293 LASSERT(import != NULL);
1294 LASSERT(import != LP_POISON);
1296 spin_lock(&import->imp_lock);
1297 import->imp_generation++;
1298 spin_unlock(&import->imp_lock);
1299 class_import_put(import);
1301 EXPORT_SYMBOL(class_destroy_import);
1303 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1305 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1307 spin_lock(&exp->exp_locks_list_guard);
1309 LASSERT(lock->l_exp_refs_nr >= 0);
1311 if (lock->l_exp_refs_target != NULL &&
1312 lock->l_exp_refs_target != exp) {
1313 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1314 exp, lock, lock->l_exp_refs_target);
1316 if ((lock->l_exp_refs_nr ++) == 0) {
1317 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1318 lock->l_exp_refs_target = exp;
1320 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1321 lock, exp, lock->l_exp_refs_nr);
1322 spin_unlock(&exp->exp_locks_list_guard);
1324 EXPORT_SYMBOL(__class_export_add_lock_ref);
1326 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1328 spin_lock(&exp->exp_locks_list_guard);
1329 LASSERT(lock->l_exp_refs_nr > 0);
1330 if (lock->l_exp_refs_target != exp) {
1331 LCONSOLE_WARN("lock %p, "
1332 "mismatching export pointers: %p, %p\n",
1333 lock, lock->l_exp_refs_target, exp);
1335 if (-- lock->l_exp_refs_nr == 0) {
1336 list_del_init(&lock->l_exp_refs_link);
1337 lock->l_exp_refs_target = NULL;
1339 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1340 lock, exp, lock->l_exp_refs_nr);
1341 spin_unlock(&exp->exp_locks_list_guard);
1343 EXPORT_SYMBOL(__class_export_del_lock_ref);
1346 /* A connection defines an export context in which preallocation can
1347 be managed. This releases the export pointer reference, and returns
1348 the export handle, so the export refcount is 1 when this function
1350 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1351 struct obd_uuid *cluuid)
1353 struct obd_export *export;
1354 LASSERT(conn != NULL);
1355 LASSERT(obd != NULL);
1356 LASSERT(cluuid != NULL);
1359 export = class_new_export(obd, cluuid);
1361 RETURN(PTR_ERR(export));
1363 conn->cookie = export->exp_handle.h_cookie;
1364 class_export_put(export);
1366 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1367 cluuid->uuid, conn->cookie);
1370 EXPORT_SYMBOL(class_connect);
1372 /* if export is involved in recovery then clean up related things */
1373 static void class_export_recovery_cleanup(struct obd_export *exp)
1375 struct obd_device *obd = exp->exp_obd;
1377 spin_lock(&obd->obd_recovery_task_lock);
1378 if (obd->obd_recovering) {
1379 if (exp->exp_in_recovery) {
1380 spin_lock(&exp->exp_lock);
1381 exp->exp_in_recovery = 0;
1382 spin_unlock(&exp->exp_lock);
1383 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1384 atomic_dec(&obd->obd_connected_clients);
1387 /* if called during recovery then should update
1388 * obd_stale_clients counter,
1389 * lightweight exports are not counted */
1390 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1391 exp->exp_obd->obd_stale_clients++;
1393 spin_unlock(&obd->obd_recovery_task_lock);
1395 spin_lock(&exp->exp_lock);
1396 /** Cleanup req replay fields */
1397 if (exp->exp_req_replay_needed) {
1398 exp->exp_req_replay_needed = 0;
1400 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1401 atomic_dec(&obd->obd_req_replay_clients);
1404 /** Cleanup lock replay data */
1405 if (exp->exp_lock_replay_needed) {
1406 exp->exp_lock_replay_needed = 0;
1408 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1409 atomic_dec(&obd->obd_lock_replay_clients);
1411 spin_unlock(&exp->exp_lock);
1414 /* This function removes 1-3 references from the export:
1415 * 1 - for export pointer passed
1416 * and if disconnect really need
1417 * 2 - removing from hash
1418 * 3 - in client_unlink_export
1419 * The export pointer passed to this function can destroyed */
1420 int class_disconnect(struct obd_export *export)
1422 int already_disconnected;
1425 if (export == NULL) {
1426 CWARN("attempting to free NULL export %p\n", export);
1430 spin_lock(&export->exp_lock);
1431 already_disconnected = export->exp_disconnected;
1432 export->exp_disconnected = 1;
1433 /* We hold references of export for uuid hash
1434 * and nid_hash and export link at least. So
1435 * it is safe to call cfs_hash_del in there. */
1436 if (!hlist_unhashed(&export->exp_nid_hash))
1437 cfs_hash_del(export->exp_obd->obd_nid_hash,
1438 &export->exp_connection->c_peer.nid,
1439 &export->exp_nid_hash);
1440 spin_unlock(&export->exp_lock);
1442 /* class_cleanup(), abort_recovery(), and class_fail_export()
1443 * all end up in here, and if any of them race we shouldn't
1444 * call extra class_export_puts(). */
1445 if (already_disconnected) {
1446 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1447 GOTO(no_disconn, already_disconnected);
1450 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1451 export->exp_handle.h_cookie);
1453 class_export_recovery_cleanup(export);
1454 class_unlink_export(export);
1456 class_export_put(export);
1459 EXPORT_SYMBOL(class_disconnect);
1461 /* Return non-zero for a fully connected export */
1462 int class_connected_export(struct obd_export *exp)
1467 spin_lock(&exp->exp_lock);
1468 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1469 spin_unlock(&exp->exp_lock);
1473 EXPORT_SYMBOL(class_connected_export);
1475 static void class_disconnect_export_list(struct list_head *list,
1476 enum obd_option flags)
1479 struct obd_export *exp;
1482 /* It's possible that an export may disconnect itself, but
1483 * nothing else will be added to this list. */
1484 while (!list_empty(list)) {
1485 exp = list_entry(list->next, struct obd_export,
1487 /* need for safe call CDEBUG after obd_disconnect */
1488 class_export_get(exp);
1490 spin_lock(&exp->exp_lock);
1491 exp->exp_flags = flags;
1492 spin_unlock(&exp->exp_lock);
1494 if (obd_uuid_equals(&exp->exp_client_uuid,
1495 &exp->exp_obd->obd_uuid)) {
1497 "exp %p export uuid == obd uuid, don't discon\n",
1499 /* Need to delete this now so we don't end up pointing
1500 * to work_list later when this export is cleaned up. */
1501 list_del_init(&exp->exp_obd_chain);
1502 class_export_put(exp);
1506 class_export_get(exp);
1507 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1508 "last request at %lld\n",
1509 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1510 exp, exp->exp_last_request_time);
1511 /* release one export reference anyway */
1512 rc = obd_disconnect(exp);
1514 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1515 obd_export_nid2str(exp), exp, rc);
1516 class_export_put(exp);
1521 void class_disconnect_exports(struct obd_device *obd)
1523 LIST_HEAD(work_list);
1526 /* Move all of the exports from obd_exports to a work list, en masse. */
1527 spin_lock(&obd->obd_dev_lock);
1528 list_splice_init(&obd->obd_exports, &work_list);
1529 list_splice_init(&obd->obd_delayed_exports, &work_list);
1530 spin_unlock(&obd->obd_dev_lock);
1532 if (!list_empty(&work_list)) {
1533 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1534 "disconnecting them\n", obd->obd_minor, obd);
1535 class_disconnect_export_list(&work_list,
1536 exp_flags_from_obd(obd));
1538 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1539 obd->obd_minor, obd);
1542 EXPORT_SYMBOL(class_disconnect_exports);
1544 /* Remove exports that have not completed recovery.
1546 void class_disconnect_stale_exports(struct obd_device *obd,
1547 int (*test_export)(struct obd_export *))
1549 LIST_HEAD(work_list);
1550 struct obd_export *exp, *n;
1554 spin_lock(&obd->obd_dev_lock);
1555 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1557 /* don't count self-export as client */
1558 if (obd_uuid_equals(&exp->exp_client_uuid,
1559 &exp->exp_obd->obd_uuid))
1562 /* don't evict clients which have no slot in last_rcvd
1563 * (e.g. lightweight connection) */
1564 if (exp->exp_target_data.ted_lr_idx == -1)
1567 spin_lock(&exp->exp_lock);
1568 if (exp->exp_failed || test_export(exp)) {
1569 spin_unlock(&exp->exp_lock);
1572 exp->exp_failed = 1;
1573 spin_unlock(&exp->exp_lock);
1575 list_move(&exp->exp_obd_chain, &work_list);
1577 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1578 obd->obd_name, exp->exp_client_uuid.uuid,
1579 obd_export_nid2str(exp));
1580 print_export_data(exp, "EVICTING", 0, D_HA);
1582 spin_unlock(&obd->obd_dev_lock);
1585 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1586 obd->obd_name, evicted);
1588 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1589 OBD_OPT_ABORT_RECOV);
1592 EXPORT_SYMBOL(class_disconnect_stale_exports);
1594 void class_fail_export(struct obd_export *exp)
1596 int rc, already_failed;
1598 spin_lock(&exp->exp_lock);
1599 already_failed = exp->exp_failed;
1600 exp->exp_failed = 1;
1601 spin_unlock(&exp->exp_lock);
1603 if (already_failed) {
1604 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1605 exp, exp->exp_client_uuid.uuid);
1609 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1610 exp, exp->exp_client_uuid.uuid);
1612 if (obd_dump_on_timeout)
1613 libcfs_debug_dumplog();
1615 /* need for safe call CDEBUG after obd_disconnect */
1616 class_export_get(exp);
1618 /* Most callers into obd_disconnect are removing their own reference
1619 * (request, for example) in addition to the one from the hash table.
1620 * We don't have such a reference here, so make one. */
1621 class_export_get(exp);
1622 rc = obd_disconnect(exp);
1624 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1626 CDEBUG(D_HA, "disconnected export %p/%s\n",
1627 exp, exp->exp_client_uuid.uuid);
1628 class_export_put(exp);
1630 EXPORT_SYMBOL(class_fail_export);
1632 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1634 struct cfs_hash *nid_hash;
1635 struct obd_export *doomed_exp = NULL;
1636 int exports_evicted = 0;
1638 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1640 spin_lock(&obd->obd_dev_lock);
1641 /* umount has run already, so evict thread should leave
1642 * its task to umount thread now */
1643 if (obd->obd_stopping) {
1644 spin_unlock(&obd->obd_dev_lock);
1645 return exports_evicted;
1647 nid_hash = obd->obd_nid_hash;
1648 cfs_hash_getref(nid_hash);
1649 spin_unlock(&obd->obd_dev_lock);
1652 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1653 if (doomed_exp == NULL)
1656 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1657 "nid %s found, wanted nid %s, requested nid %s\n",
1658 obd_export_nid2str(doomed_exp),
1659 libcfs_nid2str(nid_key), nid);
1660 LASSERTF(doomed_exp != obd->obd_self_export,
1661 "self-export is hashed by NID?\n");
1663 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1664 "request\n", obd->obd_name,
1665 obd_uuid2str(&doomed_exp->exp_client_uuid),
1666 obd_export_nid2str(doomed_exp));
1667 class_fail_export(doomed_exp);
1668 class_export_put(doomed_exp);
1671 cfs_hash_putref(nid_hash);
1673 if (!exports_evicted)
1674 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1675 obd->obd_name, nid);
1676 return exports_evicted;
1678 EXPORT_SYMBOL(obd_export_evict_by_nid);
1680 #ifdef HAVE_SERVER_SUPPORT
1681 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1683 struct obd_export *doomed_exp = NULL;
1684 struct obd_uuid doomed_uuid;
1685 int exports_evicted = 0;
1687 spin_lock(&obd->obd_dev_lock);
1688 if (obd->obd_stopping) {
1689 spin_unlock(&obd->obd_dev_lock);
1690 return exports_evicted;
1692 spin_unlock(&obd->obd_dev_lock);
1694 obd_str2uuid(&doomed_uuid, uuid);
1695 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1696 CERROR("%s: can't evict myself\n", obd->obd_name);
1697 return exports_evicted;
1700 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1701 if (doomed_exp == NULL) {
1702 CERROR("%s: can't disconnect %s: no exports found\n",
1703 obd->obd_name, uuid);
1705 CWARN("%s: evicting %s at adminstrative request\n",
1706 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1707 class_fail_export(doomed_exp);
1708 class_export_put(doomed_exp);
1709 obd_uuid_del(obd, doomed_exp);
1713 return exports_evicted;
1715 #endif /* HAVE_SERVER_SUPPORT */
1717 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1718 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1719 EXPORT_SYMBOL(class_export_dump_hook);
1722 static void print_export_data(struct obd_export *exp, const char *status,
1723 int locks, int debug_level)
1725 struct ptlrpc_reply_state *rs;
1726 struct ptlrpc_reply_state *first_reply = NULL;
1729 spin_lock(&exp->exp_lock);
1730 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1736 spin_unlock(&exp->exp_lock);
1738 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1739 "%p %s %llu stale:%d\n",
1740 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1741 obd_export_nid2str(exp),
1742 refcount_read(&exp->exp_handle.h_ref),
1743 atomic_read(&exp->exp_rpc_count),
1744 atomic_read(&exp->exp_cb_count),
1745 atomic_read(&exp->exp_locks_count),
1746 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1747 nreplies, first_reply, nreplies > 3 ? "..." : "",
1748 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1749 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1750 if (locks && class_export_dump_hook != NULL)
1751 class_export_dump_hook(exp);
1755 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1757 struct obd_export *exp;
1759 spin_lock(&obd->obd_dev_lock);
1760 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1761 print_export_data(exp, "ACTIVE", locks, debug_level);
1762 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1763 print_export_data(exp, "UNLINKED", locks, debug_level);
1764 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1765 print_export_data(exp, "DELAYED", locks, debug_level);
1766 spin_unlock(&obd->obd_dev_lock);
1769 void obd_exports_barrier(struct obd_device *obd)
1772 LASSERT(list_empty(&obd->obd_exports));
1773 spin_lock(&obd->obd_dev_lock);
1774 while (!list_empty(&obd->obd_unlinked_exports)) {
1775 spin_unlock(&obd->obd_dev_lock);
1776 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1777 if (waited > 5 && is_power_of_2(waited)) {
1778 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1779 "more than %d seconds. "
1780 "The obd refcount = %d. Is it stuck?\n",
1781 obd->obd_name, waited,
1782 atomic_read(&obd->obd_refcount));
1783 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1786 spin_lock(&obd->obd_dev_lock);
1788 spin_unlock(&obd->obd_dev_lock);
1790 EXPORT_SYMBOL(obd_exports_barrier);
1793 * Add export to the obd_zombe thread and notify it.
1795 static void obd_zombie_export_add(struct obd_export *exp) {
1796 atomic_dec(&obd_stale_export_num);
1797 spin_lock(&exp->exp_obd->obd_dev_lock);
1798 LASSERT(!list_empty(&exp->exp_obd_chain));
1799 list_del_init(&exp->exp_obd_chain);
1800 spin_unlock(&exp->exp_obd->obd_dev_lock);
1802 queue_work(zombie_wq, &exp->exp_zombie_work);
1806 * Add import to the obd_zombe thread and notify it.
1808 static void obd_zombie_import_add(struct obd_import *imp) {
1809 LASSERT(imp->imp_sec == NULL);
1811 queue_work(zombie_wq, &imp->imp_zombie_work);
1815 * wait when obd_zombie import/export queues become empty
1817 void obd_zombie_barrier(void)
1819 flush_workqueue(zombie_wq);
1821 EXPORT_SYMBOL(obd_zombie_barrier);
1824 struct obd_export *obd_stale_export_get(void)
1826 struct obd_export *exp = NULL;
1829 spin_lock(&obd_stale_export_lock);
1830 if (!list_empty(&obd_stale_exports)) {
1831 exp = list_entry(obd_stale_exports.next,
1832 struct obd_export, exp_stale_list);
1833 list_del_init(&exp->exp_stale_list);
1835 spin_unlock(&obd_stale_export_lock);
1838 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1839 atomic_read(&obd_stale_export_num));
1843 EXPORT_SYMBOL(obd_stale_export_get);
1845 void obd_stale_export_put(struct obd_export *exp)
1849 LASSERT(list_empty(&exp->exp_stale_list));
1850 if (exp->exp_lock_hash &&
1851 atomic_read(&exp->exp_lock_hash->hs_count)) {
1852 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1853 atomic_read(&obd_stale_export_num));
1855 spin_lock_bh(&exp->exp_bl_list_lock);
1856 spin_lock(&obd_stale_export_lock);
1857 /* Add to the tail if there is no blocked locks,
1858 * to the head otherwise. */
1859 if (list_empty(&exp->exp_bl_list))
1860 list_add_tail(&exp->exp_stale_list,
1861 &obd_stale_exports);
1863 list_add(&exp->exp_stale_list,
1864 &obd_stale_exports);
1866 spin_unlock(&obd_stale_export_lock);
1867 spin_unlock_bh(&exp->exp_bl_list_lock);
1869 class_export_put(exp);
1873 EXPORT_SYMBOL(obd_stale_export_put);
1876 * Adjust the position of the export in the stale list,
1877 * i.e. move to the head of the list if is needed.
1879 void obd_stale_export_adjust(struct obd_export *exp)
1881 LASSERT(exp != NULL);
1882 spin_lock_bh(&exp->exp_bl_list_lock);
1883 spin_lock(&obd_stale_export_lock);
1885 if (!list_empty(&exp->exp_stale_list) &&
1886 !list_empty(&exp->exp_bl_list))
1887 list_move(&exp->exp_stale_list, &obd_stale_exports);
1889 spin_unlock(&obd_stale_export_lock);
1890 spin_unlock_bh(&exp->exp_bl_list_lock);
1892 EXPORT_SYMBOL(obd_stale_export_adjust);
1895 * start destroy zombie import/export thread
1897 int obd_zombie_impexp_init(void)
1899 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1907 * stop destroy zombie import/export thread
1909 void obd_zombie_impexp_stop(void)
1911 destroy_workqueue(zombie_wq);
1912 LASSERT(list_empty(&obd_stale_exports));
1915 /***** Kernel-userspace comm helpers *******/
1917 /* Get length of entire message, including header */
1918 int kuc_len(int payload_len)
1920 return sizeof(struct kuc_hdr) + payload_len;
1922 EXPORT_SYMBOL(kuc_len);
1924 /* Get a pointer to kuc header, given a ptr to the payload
1925 * @param p Pointer to payload area
1926 * @returns Pointer to kuc header
1928 struct kuc_hdr * kuc_ptr(void *p)
1930 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1931 LASSERT(lh->kuc_magic == KUC_MAGIC);
1934 EXPORT_SYMBOL(kuc_ptr);
1936 /* Alloc space for a message, and fill in header
1937 * @return Pointer to payload area
1939 void *kuc_alloc(int payload_len, int transport, int type)
1942 int len = kuc_len(payload_len);
1946 return ERR_PTR(-ENOMEM);
1948 lh->kuc_magic = KUC_MAGIC;
1949 lh->kuc_transport = transport;
1950 lh->kuc_msgtype = type;
1951 lh->kuc_msglen = len;
1953 return (void *)(lh + 1);
1955 EXPORT_SYMBOL(kuc_alloc);
1957 /* Takes pointer to payload area */
1958 void kuc_free(void *p, int payload_len)
1960 struct kuc_hdr *lh = kuc_ptr(p);
1961 OBD_FREE(lh, kuc_len(payload_len));
1963 EXPORT_SYMBOL(kuc_free);
1965 struct obd_request_slot_waiter {
1966 struct list_head orsw_entry;
1967 wait_queue_head_t orsw_waitq;
1971 static bool obd_request_slot_avail(struct client_obd *cli,
1972 struct obd_request_slot_waiter *orsw)
1976 spin_lock(&cli->cl_loi_list_lock);
1977 avail = !!list_empty(&orsw->orsw_entry);
1978 spin_unlock(&cli->cl_loi_list_lock);
1984 * For network flow control, the RPC sponsor needs to acquire a credit
1985 * before sending the RPC. The credits count for a connection is defined
1986 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1987 * the subsequent RPC sponsors need to wait until others released their
1988 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1990 int obd_get_request_slot(struct client_obd *cli)
1992 struct obd_request_slot_waiter orsw;
1995 spin_lock(&cli->cl_loi_list_lock);
1996 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1997 cli->cl_rpcs_in_flight++;
1998 spin_unlock(&cli->cl_loi_list_lock);
2002 init_waitqueue_head(&orsw.orsw_waitq);
2003 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2004 orsw.orsw_signaled = false;
2005 spin_unlock(&cli->cl_loi_list_lock);
2007 rc = l_wait_event_abortable(orsw.orsw_waitq,
2008 obd_request_slot_avail(cli, &orsw) ||
2009 orsw.orsw_signaled);
2011 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2012 * freed but other (such as obd_put_request_slot) is using it. */
2013 spin_lock(&cli->cl_loi_list_lock);
2015 if (!orsw.orsw_signaled) {
2016 if (list_empty(&orsw.orsw_entry))
2017 cli->cl_rpcs_in_flight--;
2019 list_del(&orsw.orsw_entry);
2024 if (orsw.orsw_signaled) {
2025 LASSERT(list_empty(&orsw.orsw_entry));
2029 spin_unlock(&cli->cl_loi_list_lock);
2033 EXPORT_SYMBOL(obd_get_request_slot);
2035 void obd_put_request_slot(struct client_obd *cli)
2037 struct obd_request_slot_waiter *orsw;
2039 spin_lock(&cli->cl_loi_list_lock);
2040 cli->cl_rpcs_in_flight--;
2042 /* If there is free slot, wakeup the first waiter. */
2043 if (!list_empty(&cli->cl_flight_waiters) &&
2044 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2045 orsw = list_entry(cli->cl_flight_waiters.next,
2046 struct obd_request_slot_waiter, orsw_entry);
2047 list_del_init(&orsw->orsw_entry);
2048 cli->cl_rpcs_in_flight++;
2049 wake_up(&orsw->orsw_waitq);
2051 spin_unlock(&cli->cl_loi_list_lock);
2053 EXPORT_SYMBOL(obd_put_request_slot);
2055 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2057 return cli->cl_max_rpcs_in_flight;
2059 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2061 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2063 struct obd_request_slot_waiter *orsw;
2067 const char *type_name;
2070 if (max > OBD_MAX_RIF_MAX || max < 1)
2073 type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2074 if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2075 /* adjust max_mod_rpcs_in_flight to ensure it is always
2076 * strictly lower that max_rpcs_in_flight */
2078 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2079 "because it must be higher than "
2080 "max_mod_rpcs_in_flight value",
2081 cli->cl_import->imp_obd->obd_name);
2084 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2085 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2091 spin_lock(&cli->cl_loi_list_lock);
2092 old = cli->cl_max_rpcs_in_flight;
2093 cli->cl_max_rpcs_in_flight = max;
2094 client_adjust_max_dirty(cli);
2098 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2099 for (i = 0; i < diff; i++) {
2100 if (list_empty(&cli->cl_flight_waiters))
2103 orsw = list_entry(cli->cl_flight_waiters.next,
2104 struct obd_request_slot_waiter, orsw_entry);
2105 list_del_init(&orsw->orsw_entry);
2106 cli->cl_rpcs_in_flight++;
2107 wake_up(&orsw->orsw_waitq);
2109 spin_unlock(&cli->cl_loi_list_lock);
2113 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2115 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2117 return cli->cl_max_mod_rpcs_in_flight;
2119 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2121 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2123 struct obd_connect_data *ocd;
2127 if (max > OBD_MAX_RIF_MAX || max < 1)
2130 /* cannot exceed or equal max_rpcs_in_flight */
2131 if (max >= cli->cl_max_rpcs_in_flight) {
2132 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2133 "higher or equal to max_rpcs_in_flight value (%u)\n",
2134 cli->cl_import->imp_obd->obd_name,
2135 max, cli->cl_max_rpcs_in_flight);
2139 /* cannot exceed max modify RPCs in flight supported by the server */
2140 ocd = &cli->cl_import->imp_connect_data;
2141 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2142 maxmodrpcs = ocd->ocd_maxmodrpcs;
2145 if (max > maxmodrpcs) {
2146 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2147 "higher than max_mod_rpcs_per_client value (%hu) "
2148 "returned by the server at connection\n",
2149 cli->cl_import->imp_obd->obd_name,
2154 spin_lock(&cli->cl_mod_rpcs_lock);
2156 prev = cli->cl_max_mod_rpcs_in_flight;
2157 cli->cl_max_mod_rpcs_in_flight = max;
2159 /* wakeup waiters if limit has been increased */
2160 if (cli->cl_max_mod_rpcs_in_flight > prev)
2161 wake_up(&cli->cl_mod_rpcs_waitq);
2163 spin_unlock(&cli->cl_mod_rpcs_lock);
2167 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2169 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2170 struct seq_file *seq)
2172 unsigned long mod_tot = 0, mod_cum;
2173 struct timespec64 now;
2176 ktime_get_real_ts64(&now);
2178 spin_lock(&cli->cl_mod_rpcs_lock);
2180 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2181 (s64)now.tv_sec, now.tv_nsec);
2182 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2183 cli->cl_mod_rpcs_in_flight);
2185 seq_printf(seq, "\n\t\t\tmodify\n");
2186 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2188 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2191 for (i = 0; i < OBD_HIST_MAX; i++) {
2192 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2194 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2195 i, mod, pct(mod, mod_tot),
2196 pct(mod_cum, mod_tot));
2197 if (mod_cum == mod_tot)
2201 spin_unlock(&cli->cl_mod_rpcs_lock);
2205 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2207 /* The number of modify RPCs sent in parallel is limited
2208 * because the server has a finite number of slots per client to
2209 * store request result and ensure reply reconstruction when needed.
2210 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2211 * that takes into account server limit and cl_max_rpcs_in_flight
2213 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2214 * one close request is allowed above the maximum.
2216 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2221 /* A slot is available if
2222 * - number of modify RPCs in flight is less than the max
2223 * - it's a close RPC and no other close request is in flight
2225 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2226 (close_req && cli->cl_close_rpcs_in_flight == 0);
2231 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2236 spin_lock(&cli->cl_mod_rpcs_lock);
2237 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2238 spin_unlock(&cli->cl_mod_rpcs_lock);
2243 /* Get a modify RPC slot from the obd client @cli according
2244 * to the kind of operation @opc that is going to be sent
2245 * and the intent @it of the operation if it applies.
2246 * If the maximum number of modify RPCs in flight is reached
2247 * the thread is put to sleep.
2248 * Returns the tag to be set in the request message. Tag 0
2249 * is reserved for non-modifying requests.
2251 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2253 bool close_req = false;
2256 if (opc == MDS_CLOSE)
2260 spin_lock(&cli->cl_mod_rpcs_lock);
2261 max = cli->cl_max_mod_rpcs_in_flight;
2262 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2263 /* there is a slot available */
2264 cli->cl_mod_rpcs_in_flight++;
2266 cli->cl_close_rpcs_in_flight++;
2267 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2268 cli->cl_mod_rpcs_in_flight);
2269 /* find a free tag */
2270 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2272 LASSERT(i < OBD_MAX_RIF_MAX);
2273 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2274 spin_unlock(&cli->cl_mod_rpcs_lock);
2275 /* tag 0 is reserved for non-modify RPCs */
2278 "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2279 cli->cl_import->imp_obd->obd_name,
2284 spin_unlock(&cli->cl_mod_rpcs_lock);
2286 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2287 "opc %u, max %hu\n",
2288 cli->cl_import->imp_obd->obd_name, opc, max);
2290 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2291 obd_mod_rpc_slot_avail(cli,
2295 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2297 /* Put a modify RPC slot from the obd client @cli according
2298 * to the kind of operation @opc that has been sent.
2300 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2302 bool close_req = false;
2307 if (opc == MDS_CLOSE)
2310 spin_lock(&cli->cl_mod_rpcs_lock);
2311 cli->cl_mod_rpcs_in_flight--;
2313 cli->cl_close_rpcs_in_flight--;
2314 /* release the tag in the bitmap */
2315 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2316 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2317 spin_unlock(&cli->cl_mod_rpcs_lock);
2318 wake_up(&cli->cl_mod_rpcs_waitq);
2320 EXPORT_SYMBOL(obd_put_mod_rpc_slot);