4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59 const char *status, int locks, int debug_level);
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90 obd, obd->obd_namespace, obd->obd_force);
93 lu_ref_fini(&obd->obd_reference);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 struct obd_type *class_search_type(const char *name)
99 struct kobject *kobj = kset_find_obj(lustre_kset, name);
101 if (kobj && kobj->ktype == &class_ktype)
102 return container_of(kobj, struct obd_type, typ_kobj);
107 EXPORT_SYMBOL(class_search_type);
109 struct obd_type *class_get_type(const char *name)
111 struct obd_type *type;
113 type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
116 const char *modname = name;
118 #ifdef HAVE_SERVER_SUPPORT
119 if (strcmp(modname, "obdfilter") == 0)
122 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123 modname = LUSTRE_OSP_NAME;
125 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126 modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 if (try_module_get(type->typ_dt_ops->o_owner)) {
140 atomic_inc(&type->typ_refcnt);
141 /* class_search_type() returned a counted reference,
142 * but we don't need that count any more as
143 * we have one through typ_refcnt.
145 kobject_put(&type->typ_kobj);
147 kobject_put(&type->typ_kobj);
154 void class_put_type(struct obd_type *type)
157 module_put(type->typ_dt_ops->o_owner);
158 atomic_dec(&type->typ_refcnt);
161 static void class_sysfs_release(struct kobject *kobj)
163 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
165 debugfs_remove_recursive(type->typ_debugfs_entry);
166 type->typ_debugfs_entry = NULL;
169 lu_device_type_fini(type->typ_lu);
171 #ifdef CONFIG_PROC_FS
172 if (type->typ_name && type->typ_procroot)
173 remove_proc_subtree(type->typ_name, proc_lustre_root);
175 OBD_FREE(type, sizeof(*type));
178 static struct kobj_type class_ktype = {
179 .sysfs_ops = &lustre_sysfs_ops,
180 .release = class_sysfs_release,
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
186 struct dentry *symlink;
187 struct obd_type *type;
190 type = class_search_type(name);
192 kobject_put(&type->typ_kobj);
193 return ERR_PTR(-EEXIST);
196 OBD_ALLOC(type, sizeof(*type));
198 return ERR_PTR(-ENOMEM);
200 type->typ_kobj.kset = lustre_kset;
201 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202 &lustre_kset->kobj, "%s", name);
206 symlink = debugfs_create_dir(name, debugfs_lustre_root);
207 if (IS_ERR_OR_NULL(symlink)) {
208 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
209 kobject_put(&type->typ_kobj);
212 type->typ_debugfs_entry = symlink;
213 type->typ_sym_filter = true;
216 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
218 if (IS_ERR(type->typ_procroot)) {
219 CERROR("%s: can't create compat proc entry: %d\n",
220 name, (int)PTR_ERR(type->typ_procroot));
221 type->typ_procroot = NULL;
227 EXPORT_SYMBOL(class_add_symlinks);
228 #endif /* HAVE_SERVER_SUPPORT */
230 #define CLASS_MAX_NAME 1024
232 int class_register_type(const struct obd_ops *dt_ops,
233 const struct md_ops *md_ops,
234 bool enable_proc, struct lprocfs_vars *vars,
235 const char *name, struct lu_device_type *ldt)
237 struct obd_type *type;
242 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
244 type = class_search_type(name);
246 #ifdef HAVE_SERVER_SUPPORT
247 if (type->typ_sym_filter)
249 #endif /* HAVE_SERVER_SUPPORT */
250 kobject_put(&type->typ_kobj);
251 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
255 OBD_ALLOC(type, sizeof(*type));
259 type->typ_kobj.kset = lustre_kset;
260 kobject_init(&type->typ_kobj, &class_ktype);
261 #ifdef HAVE_SERVER_SUPPORT
263 #endif /* HAVE_SERVER_SUPPORT */
265 type->typ_dt_ops = dt_ops;
266 type->typ_md_ops = md_ops;
268 #ifdef HAVE_SERVER_SUPPORT
269 if (type->typ_sym_filter) {
270 type->typ_sym_filter = false;
271 kobject_put(&type->typ_kobj);
275 #ifdef CONFIG_PROC_FS
276 if (enable_proc && !type->typ_procroot) {
277 type->typ_procroot = lprocfs_register(name,
280 if (IS_ERR(type->typ_procroot)) {
281 rc = PTR_ERR(type->typ_procroot);
282 type->typ_procroot = NULL;
287 type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
289 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
290 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
292 type->typ_debugfs_entry = NULL;
296 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
299 #ifdef HAVE_SERVER_SUPPORT
304 rc = lu_device_type_init(ldt);
312 kobject_put(&type->typ_kobj);
316 EXPORT_SYMBOL(class_register_type);
318 int class_unregister_type(const char *name)
320 struct obd_type *type = class_search_type(name);
325 CERROR("unknown obd type\n");
329 if (atomic_read(&type->typ_refcnt)) {
330 CERROR("type %s has refcount (%d)\n", name,
331 atomic_read(&type->typ_refcnt));
332 /* This is a bad situation, let's make the best of it */
333 /* Remove ops, but leave the name for debugging */
334 type->typ_dt_ops = NULL;
335 type->typ_md_ops = NULL;
336 GOTO(out_put, rc = -EBUSY);
339 /* Put the final ref */
340 kobject_put(&type->typ_kobj);
342 /* Put the ref returned by class_search_type() */
343 kobject_put(&type->typ_kobj);
346 } /* class_unregister_type */
347 EXPORT_SYMBOL(class_unregister_type);
350 * Create a new obd device.
352 * Allocate the new obd_device and initialize it.
354 * \param[in] type_name obd device type string.
355 * \param[in] name obd device name.
356 * \param[in] uuid obd device UUID
358 * \retval newdev pointer to created obd_device
359 * \retval ERR_PTR(errno) on error
361 struct obd_device *class_newdev(const char *type_name, const char *name,
364 struct obd_device *newdev;
365 struct obd_type *type = NULL;
368 if (strlen(name) >= MAX_OBD_NAME) {
369 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
370 RETURN(ERR_PTR(-EINVAL));
373 type = class_get_type(type_name);
375 CERROR("OBD: unknown type: %s\n", type_name);
376 RETURN(ERR_PTR(-ENODEV));
379 newdev = obd_device_alloc();
380 if (newdev == NULL) {
381 class_put_type(type);
382 RETURN(ERR_PTR(-ENOMEM));
384 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
385 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
386 newdev->obd_type = type;
387 newdev->obd_minor = -1;
389 rwlock_init(&newdev->obd_pool_lock);
390 newdev->obd_pool_limit = 0;
391 newdev->obd_pool_slv = 0;
393 INIT_LIST_HEAD(&newdev->obd_exports);
394 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
395 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
396 INIT_LIST_HEAD(&newdev->obd_exports_timed);
397 INIT_LIST_HEAD(&newdev->obd_nid_stats);
398 spin_lock_init(&newdev->obd_nid_lock);
399 spin_lock_init(&newdev->obd_dev_lock);
400 mutex_init(&newdev->obd_dev_mutex);
401 spin_lock_init(&newdev->obd_osfs_lock);
402 /* newdev->obd_osfs_age must be set to a value in the distant
403 * past to guarantee a fresh statfs is fetched on mount. */
404 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
406 /* XXX belongs in setup not attach */
407 init_rwsem(&newdev->obd_observer_link_sem);
409 spin_lock_init(&newdev->obd_recovery_task_lock);
410 init_waitqueue_head(&newdev->obd_next_transno_waitq);
411 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
412 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
413 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
414 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
415 INIT_LIST_HEAD(&newdev->obd_evict_list);
416 INIT_LIST_HEAD(&newdev->obd_lwp_list);
418 llog_group_init(&newdev->obd_olg);
419 /* Detach drops this */
420 atomic_set(&newdev->obd_refcount, 1);
421 lu_ref_init(&newdev->obd_reference);
422 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
424 newdev->obd_conn_inprogress = 0;
426 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
428 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
429 newdev->obd_name, newdev);
437 * \param[in] obd obd_device to be freed
441 void class_free_dev(struct obd_device *obd)
443 struct obd_type *obd_type = obd->obd_type;
445 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
446 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
448 "obd %p != obd_devs[%d] %p\n",
449 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
450 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
451 "obd_refcount should be 0, not %d\n",
452 atomic_read(&obd->obd_refcount));
453 LASSERT(obd_type != NULL);
455 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
456 obd->obd_name, obd->obd_type->typ_name);
458 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
459 obd->obd_name, obd->obd_uuid.uuid);
460 if (obd->obd_stopping) {
463 /* If we're not stopping, we were never set up */
464 err = obd_cleanup(obd);
466 CERROR("Cleanup %s returned %d\n",
470 obd_device_free(obd);
472 class_put_type(obd_type);
476 * Unregister obd device.
478 * Free slot in obd_dev[] used by \a obd.
480 * \param[in] new_obd obd_device to be unregistered
484 void class_unregister_device(struct obd_device *obd)
486 write_lock(&obd_dev_lock);
487 if (obd->obd_minor >= 0) {
488 LASSERT(obd_devs[obd->obd_minor] == obd);
489 obd_devs[obd->obd_minor] = NULL;
492 write_unlock(&obd_dev_lock);
496 * Register obd device.
498 * Find free slot in obd_devs[], fills it with \a new_obd.
500 * \param[in] new_obd obd_device to be registered
503 * \retval -EEXIST device with this name is registered
504 * \retval -EOVERFLOW obd_devs[] is full
506 int class_register_device(struct obd_device *new_obd)
510 int new_obd_minor = 0;
511 bool minor_assign = false;
512 bool retried = false;
515 write_lock(&obd_dev_lock);
516 for (i = 0; i < class_devno_max(); i++) {
517 struct obd_device *obd = class_num2obd(i);
520 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
523 write_unlock(&obd_dev_lock);
525 /* the obd_device could be waited to be
526 * destroyed by the "obd_zombie_impexp_thread".
528 obd_zombie_barrier();
533 CERROR("%s: already exists, won't add\n",
535 /* in case we found a free slot before duplicate */
536 minor_assign = false;
540 if (!minor_assign && obd == NULL) {
547 new_obd->obd_minor = new_obd_minor;
548 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
549 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
550 obd_devs[new_obd_minor] = new_obd;
554 CERROR("%s: all %u/%u devices used, increase "
555 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
556 i, class_devno_max(), ret);
559 write_unlock(&obd_dev_lock);
564 static int class_name2dev_nolock(const char *name)
571 for (i = 0; i < class_devno_max(); i++) {
572 struct obd_device *obd = class_num2obd(i);
574 if (obd && strcmp(name, obd->obd_name) == 0) {
575 /* Make sure we finished attaching before we give
576 out any references */
577 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
578 if (obd->obd_attached) {
588 int class_name2dev(const char *name)
595 read_lock(&obd_dev_lock);
596 i = class_name2dev_nolock(name);
597 read_unlock(&obd_dev_lock);
601 EXPORT_SYMBOL(class_name2dev);
603 struct obd_device *class_name2obd(const char *name)
605 int dev = class_name2dev(name);
607 if (dev < 0 || dev > class_devno_max())
609 return class_num2obd(dev);
611 EXPORT_SYMBOL(class_name2obd);
613 int class_uuid2dev_nolock(struct obd_uuid *uuid)
617 for (i = 0; i < class_devno_max(); i++) {
618 struct obd_device *obd = class_num2obd(i);
620 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
621 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
629 int class_uuid2dev(struct obd_uuid *uuid)
633 read_lock(&obd_dev_lock);
634 i = class_uuid2dev_nolock(uuid);
635 read_unlock(&obd_dev_lock);
639 EXPORT_SYMBOL(class_uuid2dev);
641 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
643 int dev = class_uuid2dev(uuid);
646 return class_num2obd(dev);
648 EXPORT_SYMBOL(class_uuid2obd);
651 * Get obd device from ::obd_devs[]
653 * \param num [in] array index
655 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
656 * otherwise return the obd device there.
658 struct obd_device *class_num2obd(int num)
660 struct obd_device *obd = NULL;
662 if (num < class_devno_max()) {
667 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
668 "%p obd_magic %08x != %08x\n",
669 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
670 LASSERTF(obd->obd_minor == num,
671 "%p obd_minor %0d != %0d\n",
672 obd, obd->obd_minor, num);
679 * Find obd in obd_dev[] by name or uuid.
681 * Increment obd's refcount if found.
683 * \param[in] str obd name or uuid
685 * \retval NULL if not found
686 * \retval target pointer to found obd_device
688 struct obd_device *class_dev_by_str(const char *str)
690 struct obd_device *target = NULL;
691 struct obd_uuid tgtuuid;
694 obd_str2uuid(&tgtuuid, str);
696 read_lock(&obd_dev_lock);
697 rc = class_uuid2dev_nolock(&tgtuuid);
699 rc = class_name2dev_nolock(str);
702 target = class_num2obd(rc);
705 class_incref(target, "find", current);
706 read_unlock(&obd_dev_lock);
710 EXPORT_SYMBOL(class_dev_by_str);
713 * Get obd devices count. Device in any
715 * \retval obd device count
717 int get_devices_count(void)
719 int index, max_index = class_devno_max(), dev_count = 0;
721 read_lock(&obd_dev_lock);
722 for (index = 0; index <= max_index; index++) {
723 struct obd_device *obd = class_num2obd(index);
727 read_unlock(&obd_dev_lock);
731 EXPORT_SYMBOL(get_devices_count);
733 void class_obd_list(void)
738 read_lock(&obd_dev_lock);
739 for (i = 0; i < class_devno_max(); i++) {
740 struct obd_device *obd = class_num2obd(i);
744 if (obd->obd_stopping)
746 else if (obd->obd_set_up)
748 else if (obd->obd_attached)
752 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
753 i, status, obd->obd_type->typ_name,
754 obd->obd_name, obd->obd_uuid.uuid,
755 atomic_read(&obd->obd_refcount));
757 read_unlock(&obd_dev_lock);
760 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
761 * specified, then only the client with that uuid is returned,
762 * otherwise any client connected to the tgt is returned.
764 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
765 const char *type_name,
766 struct obd_uuid *grp_uuid)
770 read_lock(&obd_dev_lock);
771 for (i = 0; i < class_devno_max(); i++) {
772 struct obd_device *obd = class_num2obd(i);
776 if ((strncmp(obd->obd_type->typ_name, type_name,
777 strlen(type_name)) == 0)) {
778 if (obd_uuid_equals(tgt_uuid,
779 &obd->u.cli.cl_target_uuid) &&
780 ((grp_uuid)? obd_uuid_equals(grp_uuid,
781 &obd->obd_uuid) : 1)) {
782 read_unlock(&obd_dev_lock);
787 read_unlock(&obd_dev_lock);
791 EXPORT_SYMBOL(class_find_client_obd);
793 /* Iterate the obd_device list looking devices have grp_uuid. Start
794 * searching at *next, and if a device is found, the next index to look
795 * at is saved in *next. If next is NULL, then the first matching device
796 * will always be returned.
798 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
804 else if (*next >= 0 && *next < class_devno_max())
809 read_lock(&obd_dev_lock);
810 for (; i < class_devno_max(); i++) {
811 struct obd_device *obd = class_num2obd(i);
815 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
818 read_unlock(&obd_dev_lock);
822 read_unlock(&obd_dev_lock);
826 EXPORT_SYMBOL(class_devices_in_group);
829 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
830 * adjust sptlrpc settings accordingly.
832 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
834 struct obd_device *obd;
838 LASSERT(namelen > 0);
840 read_lock(&obd_dev_lock);
841 for (i = 0; i < class_devno_max(); i++) {
842 obd = class_num2obd(i);
844 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
847 /* only notify mdc, osc, osp, lwp, mdt, ost
848 * because only these have a -sptlrpc llog */
849 type = obd->obd_type->typ_name;
850 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
851 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
852 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
853 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
854 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
855 strcmp(type, LUSTRE_OST_NAME) != 0)
858 if (strncmp(obd->obd_name, fsname, namelen))
861 class_incref(obd, __FUNCTION__, obd);
862 read_unlock(&obd_dev_lock);
863 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
864 sizeof(KEY_SPTLRPC_CONF),
865 KEY_SPTLRPC_CONF, 0, NULL, NULL);
867 class_decref(obd, __FUNCTION__, obd);
868 read_lock(&obd_dev_lock);
870 read_unlock(&obd_dev_lock);
873 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
875 void obd_cleanup_caches(void)
878 if (obd_device_cachep) {
879 kmem_cache_destroy(obd_device_cachep);
880 obd_device_cachep = NULL;
886 int obd_init_caches(void)
891 LASSERT(obd_device_cachep == NULL);
892 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
893 sizeof(struct obd_device),
894 0, 0, 0, sizeof(struct obd_device), NULL);
895 if (!obd_device_cachep)
896 GOTO(out, rc = -ENOMEM);
900 obd_cleanup_caches();
904 static const char export_handle_owner[] = "export";
906 /* map connection to client */
907 struct obd_export *class_conn2export(struct lustre_handle *conn)
909 struct obd_export *export;
913 CDEBUG(D_CACHE, "looking for null handle\n");
917 if (conn->cookie == -1) { /* this means assign a new connection */
918 CDEBUG(D_CACHE, "want a new connection\n");
922 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
923 export = class_handle2object(conn->cookie, export_handle_owner);
926 EXPORT_SYMBOL(class_conn2export);
928 struct obd_device *class_exp2obd(struct obd_export *exp)
934 EXPORT_SYMBOL(class_exp2obd);
936 struct obd_import *class_exp2cliimp(struct obd_export *exp)
938 struct obd_device *obd = exp->exp_obd;
941 return obd->u.cli.cl_import;
943 EXPORT_SYMBOL(class_exp2cliimp);
945 /* Export management functions */
946 static void class_export_destroy(struct obd_export *exp)
948 struct obd_device *obd = exp->exp_obd;
951 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
952 LASSERT(obd != NULL);
954 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
955 exp->exp_client_uuid.uuid, obd->obd_name);
957 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
958 if (exp->exp_connection)
959 ptlrpc_put_connection_superhack(exp->exp_connection);
961 LASSERT(list_empty(&exp->exp_outstanding_replies));
962 LASSERT(list_empty(&exp->exp_uncommitted_replies));
963 LASSERT(list_empty(&exp->exp_req_replay_queue));
964 LASSERT(list_empty(&exp->exp_hp_rpcs));
965 obd_destroy_export(exp);
966 /* self export doesn't hold a reference to an obd, although it
967 * exists until freeing of the obd */
968 if (exp != obd->obd_self_export)
969 class_decref(obd, "export", exp);
971 OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
972 kfree_rcu(exp, exp_handle.h_rcu);
976 struct obd_export *class_export_get(struct obd_export *exp)
978 refcount_inc(&exp->exp_handle.h_ref);
979 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
980 refcount_read(&exp->exp_handle.h_ref));
983 EXPORT_SYMBOL(class_export_get);
985 void class_export_put(struct obd_export *exp)
987 LASSERT(exp != NULL);
988 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
989 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
990 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
991 refcount_read(&exp->exp_handle.h_ref) - 1);
993 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
994 struct obd_device *obd = exp->exp_obd;
996 CDEBUG(D_IOCTL, "final put %p/%s\n",
997 exp, exp->exp_client_uuid.uuid);
999 /* release nid stat refererence */
1000 lprocfs_exp_cleanup(exp);
1002 if (exp == obd->obd_self_export) {
1003 /* self export should be destroyed without
1004 * zombie thread as it doesn't hold a
1005 * reference to obd and doesn't hold any
1007 class_export_destroy(exp);
1008 /* self export is destroyed, no class
1009 * references exist and it is safe to free
1011 class_free_dev(obd);
1013 LASSERT(!list_empty(&exp->exp_obd_chain));
1014 obd_zombie_export_add(exp);
1019 EXPORT_SYMBOL(class_export_put);
1021 static void obd_zombie_exp_cull(struct work_struct *ws)
1023 struct obd_export *export;
1025 export = container_of(ws, struct obd_export, exp_zombie_work);
1026 class_export_destroy(export);
1029 /* Creates a new export, adds it to the hash table, and returns a
1030 * pointer to it. The refcount is 2: one for the hash reference, and
1031 * one for the pointer returned by this function. */
1032 struct obd_export *__class_new_export(struct obd_device *obd,
1033 struct obd_uuid *cluuid, bool is_self)
1035 struct obd_export *export;
1039 OBD_ALLOC_PTR(export);
1041 return ERR_PTR(-ENOMEM);
1043 export->exp_conn_cnt = 0;
1044 export->exp_lock_hash = NULL;
1045 export->exp_flock_hash = NULL;
1046 /* 2 = class_handle_hash + last */
1047 refcount_set(&export->exp_handle.h_ref, 2);
1048 atomic_set(&export->exp_rpc_count, 0);
1049 atomic_set(&export->exp_cb_count, 0);
1050 atomic_set(&export->exp_locks_count, 0);
1051 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1052 INIT_LIST_HEAD(&export->exp_locks_list);
1053 spin_lock_init(&export->exp_locks_list_guard);
1055 atomic_set(&export->exp_replay_count, 0);
1056 export->exp_obd = obd;
1057 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1058 spin_lock_init(&export->exp_uncommitted_replies_lock);
1059 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1060 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1061 INIT_HLIST_NODE(&export->exp_handle.h_link);
1062 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1063 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1064 class_handle_hash(&export->exp_handle, export_handle_owner);
1065 export->exp_last_request_time = ktime_get_real_seconds();
1066 spin_lock_init(&export->exp_lock);
1067 spin_lock_init(&export->exp_rpc_lock);
1068 INIT_HLIST_NODE(&export->exp_nid_hash);
1069 INIT_HLIST_NODE(&export->exp_gen_hash);
1070 spin_lock_init(&export->exp_bl_list_lock);
1071 INIT_LIST_HEAD(&export->exp_bl_list);
1072 INIT_LIST_HEAD(&export->exp_stale_list);
1073 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1075 export->exp_sp_peer = LUSTRE_SP_ANY;
1076 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1077 export->exp_client_uuid = *cluuid;
1078 obd_init_export(export);
1080 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1082 spin_lock(&obd->obd_dev_lock);
1083 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1084 /* shouldn't happen, but might race */
1085 if (obd->obd_stopping)
1086 GOTO(exit_unlock, rc = -ENODEV);
1088 rc = obd_uuid_add(obd, export);
1090 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1091 obd->obd_name, cluuid->uuid, rc);
1092 GOTO(exit_unlock, rc = -EALREADY);
1097 class_incref(obd, "export", export);
1098 list_add_tail(&export->exp_obd_chain_timed,
1099 &obd->obd_exports_timed);
1100 list_add(&export->exp_obd_chain, &obd->obd_exports);
1101 obd->obd_num_exports++;
1103 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1104 INIT_LIST_HEAD(&export->exp_obd_chain);
1106 spin_unlock(&obd->obd_dev_lock);
1110 spin_unlock(&obd->obd_dev_lock);
1111 class_handle_unhash(&export->exp_handle);
1112 obd_destroy_export(export);
1113 OBD_FREE_PTR(export);
1117 struct obd_export *class_new_export(struct obd_device *obd,
1118 struct obd_uuid *uuid)
1120 return __class_new_export(obd, uuid, false);
1122 EXPORT_SYMBOL(class_new_export);
1124 struct obd_export *class_new_export_self(struct obd_device *obd,
1125 struct obd_uuid *uuid)
1127 return __class_new_export(obd, uuid, true);
1130 void class_unlink_export(struct obd_export *exp)
1132 class_handle_unhash(&exp->exp_handle);
1134 if (exp->exp_obd->obd_self_export == exp) {
1135 class_export_put(exp);
1139 spin_lock(&exp->exp_obd->obd_dev_lock);
1140 /* delete an uuid-export hashitem from hashtables */
1141 if (exp != exp->exp_obd->obd_self_export)
1142 obd_uuid_del(exp->exp_obd, exp);
1144 #ifdef HAVE_SERVER_SUPPORT
1145 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1146 struct tg_export_data *ted = &exp->exp_target_data;
1147 struct cfs_hash *hash;
1149 /* Because obd_gen_hash will not be released until
1150 * class_cleanup(), so hash should never be NULL here */
1151 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1152 LASSERT(hash != NULL);
1153 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1154 &exp->exp_gen_hash);
1155 cfs_hash_putref(hash);
1157 #endif /* HAVE_SERVER_SUPPORT */
1159 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1160 list_del_init(&exp->exp_obd_chain_timed);
1161 exp->exp_obd->obd_num_exports--;
1162 spin_unlock(&exp->exp_obd->obd_dev_lock);
1163 atomic_inc(&obd_stale_export_num);
1165 /* A reference is kept by obd_stale_exports list */
1166 obd_stale_export_put(exp);
1168 EXPORT_SYMBOL(class_unlink_export);
1170 /* Import management functions */
1171 static void obd_zombie_import_free(struct obd_import *imp)
1175 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1176 imp->imp_obd->obd_name);
1178 LASSERT(refcount_read(&imp->imp_refcount) == 0);
1180 ptlrpc_put_connection_superhack(imp->imp_connection);
1182 while (!list_empty(&imp->imp_conn_list)) {
1183 struct obd_import_conn *imp_conn;
1185 imp_conn = list_entry(imp->imp_conn_list.next,
1186 struct obd_import_conn, oic_item);
1187 list_del_init(&imp_conn->oic_item);
1188 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1189 OBD_FREE(imp_conn, sizeof(*imp_conn));
1192 LASSERT(imp->imp_sec == NULL);
1193 class_decref(imp->imp_obd, "import", imp);
1198 struct obd_import *class_import_get(struct obd_import *import)
1200 refcount_inc(&import->imp_refcount);
1201 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1202 refcount_read(&import->imp_refcount),
1203 import->imp_obd->obd_name);
1206 EXPORT_SYMBOL(class_import_get);
1208 void class_import_put(struct obd_import *imp)
1212 LASSERT(refcount_read(&imp->imp_refcount) > 0);
1214 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1215 refcount_read(&imp->imp_refcount) - 1,
1216 imp->imp_obd->obd_name);
1218 if (refcount_dec_and_test(&imp->imp_refcount)) {
1219 CDEBUG(D_INFO, "final put import %p\n", imp);
1220 obd_zombie_import_add(imp);
1225 EXPORT_SYMBOL(class_import_put);
1227 static void init_imp_at(struct imp_at *at) {
1229 at_init(&at->iat_net_latency, 0, 0);
1230 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1231 /* max service estimates are tracked on the server side, so
1232 don't use the AT history here, just use the last reported
1233 val. (But keep hist for proc histogram, worst_ever) */
1234 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1239 static void obd_zombie_imp_cull(struct work_struct *ws)
1241 struct obd_import *import;
1243 import = container_of(ws, struct obd_import, imp_zombie_work);
1244 obd_zombie_import_free(import);
1247 struct obd_import *class_new_import(struct obd_device *obd)
1249 struct obd_import *imp;
1250 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1252 OBD_ALLOC(imp, sizeof(*imp));
1256 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1257 INIT_LIST_HEAD(&imp->imp_replay_list);
1258 INIT_LIST_HEAD(&imp->imp_sending_list);
1259 INIT_LIST_HEAD(&imp->imp_delayed_list);
1260 INIT_LIST_HEAD(&imp->imp_committed_list);
1261 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1262 imp->imp_known_replied_xid = 0;
1263 imp->imp_replay_cursor = &imp->imp_committed_list;
1264 spin_lock_init(&imp->imp_lock);
1265 imp->imp_last_success_conn = 0;
1266 imp->imp_state = LUSTRE_IMP_NEW;
1267 imp->imp_obd = class_incref(obd, "import", imp);
1268 rwlock_init(&imp->imp_sec_lock);
1269 init_waitqueue_head(&imp->imp_recovery_waitq);
1270 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1272 if (curr_pid_ns && curr_pid_ns->child_reaper)
1273 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1275 imp->imp_sec_refpid = 1;
1277 refcount_set(&imp->imp_refcount, 2);
1278 atomic_set(&imp->imp_unregistering, 0);
1279 atomic_set(&imp->imp_inflight, 0);
1280 atomic_set(&imp->imp_replay_inflight, 0);
1281 atomic_set(&imp->imp_inval_count, 0);
1282 INIT_LIST_HEAD(&imp->imp_conn_list);
1283 init_imp_at(&imp->imp_at);
1285 /* the default magic is V2, will be used in connect RPC, and
1286 * then adjusted according to the flags in request/reply. */
1287 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1291 EXPORT_SYMBOL(class_new_import);
1293 void class_destroy_import(struct obd_import *import)
1295 LASSERT(import != NULL);
1296 LASSERT(import != LP_POISON);
1298 spin_lock(&import->imp_lock);
1299 import->imp_generation++;
1300 spin_unlock(&import->imp_lock);
1301 class_import_put(import);
1303 EXPORT_SYMBOL(class_destroy_import);
1305 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1307 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1309 spin_lock(&exp->exp_locks_list_guard);
1311 LASSERT(lock->l_exp_refs_nr >= 0);
1313 if (lock->l_exp_refs_target != NULL &&
1314 lock->l_exp_refs_target != exp) {
1315 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1316 exp, lock, lock->l_exp_refs_target);
1318 if ((lock->l_exp_refs_nr ++) == 0) {
1319 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1320 lock->l_exp_refs_target = exp;
1322 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1323 lock, exp, lock->l_exp_refs_nr);
1324 spin_unlock(&exp->exp_locks_list_guard);
1326 EXPORT_SYMBOL(__class_export_add_lock_ref);
1328 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1330 spin_lock(&exp->exp_locks_list_guard);
1331 LASSERT(lock->l_exp_refs_nr > 0);
1332 if (lock->l_exp_refs_target != exp) {
1333 LCONSOLE_WARN("lock %p, "
1334 "mismatching export pointers: %p, %p\n",
1335 lock, lock->l_exp_refs_target, exp);
1337 if (-- lock->l_exp_refs_nr == 0) {
1338 list_del_init(&lock->l_exp_refs_link);
1339 lock->l_exp_refs_target = NULL;
1341 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1342 lock, exp, lock->l_exp_refs_nr);
1343 spin_unlock(&exp->exp_locks_list_guard);
1345 EXPORT_SYMBOL(__class_export_del_lock_ref);
1348 /* A connection defines an export context in which preallocation can
1349 be managed. This releases the export pointer reference, and returns
1350 the export handle, so the export refcount is 1 when this function
1352 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1353 struct obd_uuid *cluuid)
1355 struct obd_export *export;
1356 LASSERT(conn != NULL);
1357 LASSERT(obd != NULL);
1358 LASSERT(cluuid != NULL);
1361 export = class_new_export(obd, cluuid);
1363 RETURN(PTR_ERR(export));
1365 conn->cookie = export->exp_handle.h_cookie;
1366 class_export_put(export);
1368 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1369 cluuid->uuid, conn->cookie);
1372 EXPORT_SYMBOL(class_connect);
1374 /* if export is involved in recovery then clean up related things */
1375 static void class_export_recovery_cleanup(struct obd_export *exp)
1377 struct obd_device *obd = exp->exp_obd;
1379 spin_lock(&obd->obd_recovery_task_lock);
1380 if (obd->obd_recovering) {
1381 if (exp->exp_in_recovery) {
1382 spin_lock(&exp->exp_lock);
1383 exp->exp_in_recovery = 0;
1384 spin_unlock(&exp->exp_lock);
1385 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1386 atomic_dec(&obd->obd_connected_clients);
1389 /* if called during recovery then should update
1390 * obd_stale_clients counter,
1391 * lightweight exports are not counted */
1392 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1393 exp->exp_obd->obd_stale_clients++;
1395 spin_unlock(&obd->obd_recovery_task_lock);
1397 spin_lock(&exp->exp_lock);
1398 /** Cleanup req replay fields */
1399 if (exp->exp_req_replay_needed) {
1400 exp->exp_req_replay_needed = 0;
1402 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1403 atomic_dec(&obd->obd_req_replay_clients);
1406 /** Cleanup lock replay data */
1407 if (exp->exp_lock_replay_needed) {
1408 exp->exp_lock_replay_needed = 0;
1410 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1411 atomic_dec(&obd->obd_lock_replay_clients);
1413 spin_unlock(&exp->exp_lock);
1416 /* This function removes 1-3 references from the export:
1417 * 1 - for export pointer passed
1418 * and if disconnect really need
1419 * 2 - removing from hash
1420 * 3 - in client_unlink_export
1421 * The export pointer passed to this function can destroyed */
1422 int class_disconnect(struct obd_export *export)
1424 int already_disconnected;
1427 if (export == NULL) {
1428 CWARN("attempting to free NULL export %p\n", export);
1432 spin_lock(&export->exp_lock);
1433 already_disconnected = export->exp_disconnected;
1434 export->exp_disconnected = 1;
1435 /* We hold references of export for uuid hash
1436 * and nid_hash and export link at least. So
1437 * it is safe to call cfs_hash_del in there. */
1438 if (!hlist_unhashed(&export->exp_nid_hash))
1439 cfs_hash_del(export->exp_obd->obd_nid_hash,
1440 &export->exp_connection->c_peer.nid,
1441 &export->exp_nid_hash);
1442 spin_unlock(&export->exp_lock);
1444 /* class_cleanup(), abort_recovery(), and class_fail_export()
1445 * all end up in here, and if any of them race we shouldn't
1446 * call extra class_export_puts(). */
1447 if (already_disconnected) {
1448 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1449 GOTO(no_disconn, already_disconnected);
1452 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1453 export->exp_handle.h_cookie);
1455 class_export_recovery_cleanup(export);
1456 class_unlink_export(export);
1458 class_export_put(export);
1461 EXPORT_SYMBOL(class_disconnect);
1463 /* Return non-zero for a fully connected export */
1464 int class_connected_export(struct obd_export *exp)
1469 spin_lock(&exp->exp_lock);
1470 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1471 spin_unlock(&exp->exp_lock);
1475 EXPORT_SYMBOL(class_connected_export);
1477 static void class_disconnect_export_list(struct list_head *list,
1478 enum obd_option flags)
1481 struct obd_export *exp;
1484 /* It's possible that an export may disconnect itself, but
1485 * nothing else will be added to this list. */
1486 while (!list_empty(list)) {
1487 exp = list_entry(list->next, struct obd_export,
1489 /* need for safe call CDEBUG after obd_disconnect */
1490 class_export_get(exp);
1492 spin_lock(&exp->exp_lock);
1493 exp->exp_flags = flags;
1494 spin_unlock(&exp->exp_lock);
1496 if (obd_uuid_equals(&exp->exp_client_uuid,
1497 &exp->exp_obd->obd_uuid)) {
1499 "exp %p export uuid == obd uuid, don't discon\n",
1501 /* Need to delete this now so we don't end up pointing
1502 * to work_list later when this export is cleaned up. */
1503 list_del_init(&exp->exp_obd_chain);
1504 class_export_put(exp);
1508 class_export_get(exp);
1509 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1510 "last request at %lld\n",
1511 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1512 exp, exp->exp_last_request_time);
1513 /* release one export reference anyway */
1514 rc = obd_disconnect(exp);
1516 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1517 obd_export_nid2str(exp), exp, rc);
1518 class_export_put(exp);
1523 void class_disconnect_exports(struct obd_device *obd)
1525 LIST_HEAD(work_list);
1528 /* Move all of the exports from obd_exports to a work list, en masse. */
1529 spin_lock(&obd->obd_dev_lock);
1530 list_splice_init(&obd->obd_exports, &work_list);
1531 list_splice_init(&obd->obd_delayed_exports, &work_list);
1532 spin_unlock(&obd->obd_dev_lock);
1534 if (!list_empty(&work_list)) {
1535 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1536 "disconnecting them\n", obd->obd_minor, obd);
1537 class_disconnect_export_list(&work_list,
1538 exp_flags_from_obd(obd));
1540 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1541 obd->obd_minor, obd);
1544 EXPORT_SYMBOL(class_disconnect_exports);
1546 /* Remove exports that have not completed recovery.
1548 void class_disconnect_stale_exports(struct obd_device *obd,
1549 int (*test_export)(struct obd_export *))
1551 LIST_HEAD(work_list);
1552 struct obd_export *exp, *n;
1556 spin_lock(&obd->obd_dev_lock);
1557 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1559 /* don't count self-export as client */
1560 if (obd_uuid_equals(&exp->exp_client_uuid,
1561 &exp->exp_obd->obd_uuid))
1564 /* don't evict clients which have no slot in last_rcvd
1565 * (e.g. lightweight connection) */
1566 if (exp->exp_target_data.ted_lr_idx == -1)
1569 spin_lock(&exp->exp_lock);
1570 if (exp->exp_failed || test_export(exp)) {
1571 spin_unlock(&exp->exp_lock);
1574 exp->exp_failed = 1;
1575 spin_unlock(&exp->exp_lock);
1577 list_move(&exp->exp_obd_chain, &work_list);
1579 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1580 obd->obd_name, exp->exp_client_uuid.uuid,
1581 obd_export_nid2str(exp));
1582 print_export_data(exp, "EVICTING", 0, D_HA);
1584 spin_unlock(&obd->obd_dev_lock);
1587 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1588 obd->obd_name, evicted);
1590 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1591 OBD_OPT_ABORT_RECOV);
1594 EXPORT_SYMBOL(class_disconnect_stale_exports);
1596 void class_fail_export(struct obd_export *exp)
1598 int rc, already_failed;
1600 spin_lock(&exp->exp_lock);
1601 already_failed = exp->exp_failed;
1602 exp->exp_failed = 1;
1603 spin_unlock(&exp->exp_lock);
1605 if (already_failed) {
1606 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1607 exp, exp->exp_client_uuid.uuid);
1611 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1612 exp, exp->exp_client_uuid.uuid);
1614 if (obd_dump_on_timeout)
1615 libcfs_debug_dumplog();
1617 /* need for safe call CDEBUG after obd_disconnect */
1618 class_export_get(exp);
1620 /* Most callers into obd_disconnect are removing their own reference
1621 * (request, for example) in addition to the one from the hash table.
1622 * We don't have such a reference here, so make one. */
1623 class_export_get(exp);
1624 rc = obd_disconnect(exp);
1626 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1628 CDEBUG(D_HA, "disconnected export %p/%s\n",
1629 exp, exp->exp_client_uuid.uuid);
1630 class_export_put(exp);
1632 EXPORT_SYMBOL(class_fail_export);
1634 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1636 struct cfs_hash *nid_hash;
1637 struct obd_export *doomed_exp = NULL;
1638 int exports_evicted = 0;
1640 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1642 spin_lock(&obd->obd_dev_lock);
1643 /* umount has run already, so evict thread should leave
1644 * its task to umount thread now */
1645 if (obd->obd_stopping) {
1646 spin_unlock(&obd->obd_dev_lock);
1647 return exports_evicted;
1649 nid_hash = obd->obd_nid_hash;
1650 cfs_hash_getref(nid_hash);
1651 spin_unlock(&obd->obd_dev_lock);
1654 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1655 if (doomed_exp == NULL)
1658 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1659 "nid %s found, wanted nid %s, requested nid %s\n",
1660 obd_export_nid2str(doomed_exp),
1661 libcfs_nid2str(nid_key), nid);
1662 LASSERTF(doomed_exp != obd->obd_self_export,
1663 "self-export is hashed by NID?\n");
1665 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1666 "request\n", obd->obd_name,
1667 obd_uuid2str(&doomed_exp->exp_client_uuid),
1668 obd_export_nid2str(doomed_exp));
1669 class_fail_export(doomed_exp);
1670 class_export_put(doomed_exp);
1673 cfs_hash_putref(nid_hash);
1675 if (!exports_evicted)
1676 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1677 obd->obd_name, nid);
1678 return exports_evicted;
1680 EXPORT_SYMBOL(obd_export_evict_by_nid);
1682 #ifdef HAVE_SERVER_SUPPORT
1683 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1685 struct obd_export *doomed_exp = NULL;
1686 struct obd_uuid doomed_uuid;
1687 int exports_evicted = 0;
1689 spin_lock(&obd->obd_dev_lock);
1690 if (obd->obd_stopping) {
1691 spin_unlock(&obd->obd_dev_lock);
1692 return exports_evicted;
1694 spin_unlock(&obd->obd_dev_lock);
1696 obd_str2uuid(&doomed_uuid, uuid);
1697 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1698 CERROR("%s: can't evict myself\n", obd->obd_name);
1699 return exports_evicted;
1702 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1703 if (doomed_exp == NULL) {
1704 CERROR("%s: can't disconnect %s: no exports found\n",
1705 obd->obd_name, uuid);
1707 CWARN("%s: evicting %s at adminstrative request\n",
1708 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1709 class_fail_export(doomed_exp);
1710 class_export_put(doomed_exp);
1711 obd_uuid_del(obd, doomed_exp);
1715 return exports_evicted;
1717 #endif /* HAVE_SERVER_SUPPORT */
1719 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1720 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1721 EXPORT_SYMBOL(class_export_dump_hook);
1724 static void print_export_data(struct obd_export *exp, const char *status,
1725 int locks, int debug_level)
1727 struct ptlrpc_reply_state *rs;
1728 struct ptlrpc_reply_state *first_reply = NULL;
1731 spin_lock(&exp->exp_lock);
1732 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1738 spin_unlock(&exp->exp_lock);
1740 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1741 "%p %s %llu stale:%d\n",
1742 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1743 obd_export_nid2str(exp),
1744 refcount_read(&exp->exp_handle.h_ref),
1745 atomic_read(&exp->exp_rpc_count),
1746 atomic_read(&exp->exp_cb_count),
1747 atomic_read(&exp->exp_locks_count),
1748 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1749 nreplies, first_reply, nreplies > 3 ? "..." : "",
1750 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1751 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1752 if (locks && class_export_dump_hook != NULL)
1753 class_export_dump_hook(exp);
1757 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1759 struct obd_export *exp;
1761 spin_lock(&obd->obd_dev_lock);
1762 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1763 print_export_data(exp, "ACTIVE", locks, debug_level);
1764 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1765 print_export_data(exp, "UNLINKED", locks, debug_level);
1766 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1767 print_export_data(exp, "DELAYED", locks, debug_level);
1768 spin_unlock(&obd->obd_dev_lock);
1771 void obd_exports_barrier(struct obd_device *obd)
1774 LASSERT(list_empty(&obd->obd_exports));
1775 spin_lock(&obd->obd_dev_lock);
1776 while (!list_empty(&obd->obd_unlinked_exports)) {
1777 spin_unlock(&obd->obd_dev_lock);
1778 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1779 if (waited > 5 && is_power_of_2(waited)) {
1780 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1781 "more than %d seconds. "
1782 "The obd refcount = %d. Is it stuck?\n",
1783 obd->obd_name, waited,
1784 atomic_read(&obd->obd_refcount));
1785 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1788 spin_lock(&obd->obd_dev_lock);
1790 spin_unlock(&obd->obd_dev_lock);
1792 EXPORT_SYMBOL(obd_exports_barrier);
1795 * Add export to the obd_zombe thread and notify it.
1797 static void obd_zombie_export_add(struct obd_export *exp) {
1798 atomic_dec(&obd_stale_export_num);
1799 spin_lock(&exp->exp_obd->obd_dev_lock);
1800 LASSERT(!list_empty(&exp->exp_obd_chain));
1801 list_del_init(&exp->exp_obd_chain);
1802 spin_unlock(&exp->exp_obd->obd_dev_lock);
1804 queue_work(zombie_wq, &exp->exp_zombie_work);
1808 * Add import to the obd_zombe thread and notify it.
1810 static void obd_zombie_import_add(struct obd_import *imp) {
1811 LASSERT(imp->imp_sec == NULL);
1813 queue_work(zombie_wq, &imp->imp_zombie_work);
1817 * wait when obd_zombie import/export queues become empty
1819 void obd_zombie_barrier(void)
1821 flush_workqueue(zombie_wq);
1823 EXPORT_SYMBOL(obd_zombie_barrier);
1826 struct obd_export *obd_stale_export_get(void)
1828 struct obd_export *exp = NULL;
1831 spin_lock(&obd_stale_export_lock);
1832 if (!list_empty(&obd_stale_exports)) {
1833 exp = list_entry(obd_stale_exports.next,
1834 struct obd_export, exp_stale_list);
1835 list_del_init(&exp->exp_stale_list);
1837 spin_unlock(&obd_stale_export_lock);
1840 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1841 atomic_read(&obd_stale_export_num));
1845 EXPORT_SYMBOL(obd_stale_export_get);
1847 void obd_stale_export_put(struct obd_export *exp)
1851 LASSERT(list_empty(&exp->exp_stale_list));
1852 if (exp->exp_lock_hash &&
1853 atomic_read(&exp->exp_lock_hash->hs_count)) {
1854 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1855 atomic_read(&obd_stale_export_num));
1857 spin_lock_bh(&exp->exp_bl_list_lock);
1858 spin_lock(&obd_stale_export_lock);
1859 /* Add to the tail if there is no blocked locks,
1860 * to the head otherwise. */
1861 if (list_empty(&exp->exp_bl_list))
1862 list_add_tail(&exp->exp_stale_list,
1863 &obd_stale_exports);
1865 list_add(&exp->exp_stale_list,
1866 &obd_stale_exports);
1868 spin_unlock(&obd_stale_export_lock);
1869 spin_unlock_bh(&exp->exp_bl_list_lock);
1871 class_export_put(exp);
1875 EXPORT_SYMBOL(obd_stale_export_put);
1878 * Adjust the position of the export in the stale list,
1879 * i.e. move to the head of the list if is needed.
1881 void obd_stale_export_adjust(struct obd_export *exp)
1883 LASSERT(exp != NULL);
1884 spin_lock_bh(&exp->exp_bl_list_lock);
1885 spin_lock(&obd_stale_export_lock);
1887 if (!list_empty(&exp->exp_stale_list) &&
1888 !list_empty(&exp->exp_bl_list))
1889 list_move(&exp->exp_stale_list, &obd_stale_exports);
1891 spin_unlock(&obd_stale_export_lock);
1892 spin_unlock_bh(&exp->exp_bl_list_lock);
1894 EXPORT_SYMBOL(obd_stale_export_adjust);
1897 * start destroy zombie import/export thread
1899 int obd_zombie_impexp_init(void)
1901 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1909 * stop destroy zombie import/export thread
1911 void obd_zombie_impexp_stop(void)
1913 destroy_workqueue(zombie_wq);
1914 LASSERT(list_empty(&obd_stale_exports));
1917 /***** Kernel-userspace comm helpers *******/
1919 /* Get length of entire message, including header */
1920 int kuc_len(int payload_len)
1922 return sizeof(struct kuc_hdr) + payload_len;
1924 EXPORT_SYMBOL(kuc_len);
1926 /* Get a pointer to kuc header, given a ptr to the payload
1927 * @param p Pointer to payload area
1928 * @returns Pointer to kuc header
1930 struct kuc_hdr * kuc_ptr(void *p)
1932 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1933 LASSERT(lh->kuc_magic == KUC_MAGIC);
1936 EXPORT_SYMBOL(kuc_ptr);
1938 /* Alloc space for a message, and fill in header
1939 * @return Pointer to payload area
1941 void *kuc_alloc(int payload_len, int transport, int type)
1944 int len = kuc_len(payload_len);
1948 return ERR_PTR(-ENOMEM);
1950 lh->kuc_magic = KUC_MAGIC;
1951 lh->kuc_transport = transport;
1952 lh->kuc_msgtype = type;
1953 lh->kuc_msglen = len;
1955 return (void *)(lh + 1);
1957 EXPORT_SYMBOL(kuc_alloc);
1959 /* Takes pointer to payload area */
1960 void kuc_free(void *p, int payload_len)
1962 struct kuc_hdr *lh = kuc_ptr(p);
1963 OBD_FREE(lh, kuc_len(payload_len));
1965 EXPORT_SYMBOL(kuc_free);
1967 struct obd_request_slot_waiter {
1968 struct list_head orsw_entry;
1969 wait_queue_head_t orsw_waitq;
1973 static bool obd_request_slot_avail(struct client_obd *cli,
1974 struct obd_request_slot_waiter *orsw)
1978 spin_lock(&cli->cl_loi_list_lock);
1979 avail = !!list_empty(&orsw->orsw_entry);
1980 spin_unlock(&cli->cl_loi_list_lock);
1986 * For network flow control, the RPC sponsor needs to acquire a credit
1987 * before sending the RPC. The credits count for a connection is defined
1988 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1989 * the subsequent RPC sponsors need to wait until others released their
1990 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1992 int obd_get_request_slot(struct client_obd *cli)
1994 struct obd_request_slot_waiter orsw;
1997 spin_lock(&cli->cl_loi_list_lock);
1998 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1999 cli->cl_rpcs_in_flight++;
2000 spin_unlock(&cli->cl_loi_list_lock);
2004 init_waitqueue_head(&orsw.orsw_waitq);
2005 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2006 orsw.orsw_signaled = false;
2007 spin_unlock(&cli->cl_loi_list_lock);
2009 rc = l_wait_event_abortable(orsw.orsw_waitq,
2010 obd_request_slot_avail(cli, &orsw) ||
2011 orsw.orsw_signaled);
2013 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2014 * freed but other (such as obd_put_request_slot) is using it. */
2015 spin_lock(&cli->cl_loi_list_lock);
2017 if (!orsw.orsw_signaled) {
2018 if (list_empty(&orsw.orsw_entry))
2019 cli->cl_rpcs_in_flight--;
2021 list_del(&orsw.orsw_entry);
2026 if (orsw.orsw_signaled) {
2027 LASSERT(list_empty(&orsw.orsw_entry));
2031 spin_unlock(&cli->cl_loi_list_lock);
2035 EXPORT_SYMBOL(obd_get_request_slot);
2037 void obd_put_request_slot(struct client_obd *cli)
2039 struct obd_request_slot_waiter *orsw;
2041 spin_lock(&cli->cl_loi_list_lock);
2042 cli->cl_rpcs_in_flight--;
2044 /* If there is free slot, wakeup the first waiter. */
2045 if (!list_empty(&cli->cl_flight_waiters) &&
2046 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2047 orsw = list_entry(cli->cl_flight_waiters.next,
2048 struct obd_request_slot_waiter, orsw_entry);
2049 list_del_init(&orsw->orsw_entry);
2050 cli->cl_rpcs_in_flight++;
2051 wake_up(&orsw->orsw_waitq);
2053 spin_unlock(&cli->cl_loi_list_lock);
2055 EXPORT_SYMBOL(obd_put_request_slot);
2057 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2059 return cli->cl_max_rpcs_in_flight;
2061 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2063 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2065 struct obd_request_slot_waiter *orsw;
2069 const char *type_name;
2072 if (max > OBD_MAX_RIF_MAX || max < 1)
2075 type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2076 if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2077 /* adjust max_mod_rpcs_in_flight to ensure it is always
2078 * strictly lower that max_rpcs_in_flight */
2080 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2081 "because it must be higher than "
2082 "max_mod_rpcs_in_flight value",
2083 cli->cl_import->imp_obd->obd_name);
2086 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2087 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2093 spin_lock(&cli->cl_loi_list_lock);
2094 old = cli->cl_max_rpcs_in_flight;
2095 cli->cl_max_rpcs_in_flight = max;
2096 client_adjust_max_dirty(cli);
2100 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2101 for (i = 0; i < diff; i++) {
2102 if (list_empty(&cli->cl_flight_waiters))
2105 orsw = list_entry(cli->cl_flight_waiters.next,
2106 struct obd_request_slot_waiter, orsw_entry);
2107 list_del_init(&orsw->orsw_entry);
2108 cli->cl_rpcs_in_flight++;
2109 wake_up(&orsw->orsw_waitq);
2111 spin_unlock(&cli->cl_loi_list_lock);
2115 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2117 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2119 return cli->cl_max_mod_rpcs_in_flight;
2121 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2123 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2125 struct obd_connect_data *ocd;
2129 if (max > OBD_MAX_RIF_MAX || max < 1)
2132 /* cannot exceed or equal max_rpcs_in_flight */
2133 if (max >= cli->cl_max_rpcs_in_flight) {
2134 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2135 "higher or equal to max_rpcs_in_flight value (%u)\n",
2136 cli->cl_import->imp_obd->obd_name,
2137 max, cli->cl_max_rpcs_in_flight);
2141 /* cannot exceed max modify RPCs in flight supported by the server */
2142 ocd = &cli->cl_import->imp_connect_data;
2143 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2144 maxmodrpcs = ocd->ocd_maxmodrpcs;
2147 if (max > maxmodrpcs) {
2148 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2149 "higher than max_mod_rpcs_per_client value (%hu) "
2150 "returned by the server at connection\n",
2151 cli->cl_import->imp_obd->obd_name,
2156 spin_lock(&cli->cl_mod_rpcs_lock);
2158 prev = cli->cl_max_mod_rpcs_in_flight;
2159 cli->cl_max_mod_rpcs_in_flight = max;
2161 /* wakeup waiters if limit has been increased */
2162 if (cli->cl_max_mod_rpcs_in_flight > prev)
2163 wake_up(&cli->cl_mod_rpcs_waitq);
2165 spin_unlock(&cli->cl_mod_rpcs_lock);
2169 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2171 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2172 struct seq_file *seq)
2174 unsigned long mod_tot = 0, mod_cum;
2175 struct timespec64 now;
2178 ktime_get_real_ts64(&now);
2180 spin_lock(&cli->cl_mod_rpcs_lock);
2182 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2183 (s64)now.tv_sec, now.tv_nsec);
2184 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2185 cli->cl_mod_rpcs_in_flight);
2187 seq_printf(seq, "\n\t\t\tmodify\n");
2188 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2190 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2193 for (i = 0; i < OBD_HIST_MAX; i++) {
2194 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2196 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2197 i, mod, pct(mod, mod_tot),
2198 pct(mod_cum, mod_tot));
2199 if (mod_cum == mod_tot)
2203 spin_unlock(&cli->cl_mod_rpcs_lock);
2207 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2209 /* The number of modify RPCs sent in parallel is limited
2210 * because the server has a finite number of slots per client to
2211 * store request result and ensure reply reconstruction when needed.
2212 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2213 * that takes into account server limit and cl_max_rpcs_in_flight
2215 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2216 * one close request is allowed above the maximum.
2218 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2223 /* A slot is available if
2224 * - number of modify RPCs in flight is less than the max
2225 * - it's a close RPC and no other close request is in flight
2227 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2228 (close_req && cli->cl_close_rpcs_in_flight == 0);
2233 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2238 spin_lock(&cli->cl_mod_rpcs_lock);
2239 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2240 spin_unlock(&cli->cl_mod_rpcs_lock);
2245 /* Get a modify RPC slot from the obd client @cli according
2246 * to the kind of operation @opc that is going to be sent
2247 * and the intent @it of the operation if it applies.
2248 * If the maximum number of modify RPCs in flight is reached
2249 * the thread is put to sleep.
2250 * Returns the tag to be set in the request message. Tag 0
2251 * is reserved for non-modifying requests.
2253 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2255 bool close_req = false;
2258 if (opc == MDS_CLOSE)
2262 spin_lock(&cli->cl_mod_rpcs_lock);
2263 max = cli->cl_max_mod_rpcs_in_flight;
2264 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2265 /* there is a slot available */
2266 cli->cl_mod_rpcs_in_flight++;
2268 cli->cl_close_rpcs_in_flight++;
2269 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2270 cli->cl_mod_rpcs_in_flight);
2271 /* find a free tag */
2272 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2274 LASSERT(i < OBD_MAX_RIF_MAX);
2275 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2276 spin_unlock(&cli->cl_mod_rpcs_lock);
2277 /* tag 0 is reserved for non-modify RPCs */
2280 "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2281 cli->cl_import->imp_obd->obd_name,
2286 spin_unlock(&cli->cl_mod_rpcs_lock);
2288 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2289 "opc %u, max %hu\n",
2290 cli->cl_import->imp_obd->obd_name, opc, max);
2292 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2293 obd_mod_rpc_slot_avail(cli,
2297 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2299 /* Put a modify RPC slot from the obd client @cli according
2300 * to the kind of operation @opc that has been sent.
2302 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2304 bool close_req = false;
2309 if (opc == MDS_CLOSE)
2312 spin_lock(&cli->cl_mod_rpcs_lock);
2313 cli->cl_mod_rpcs_in_flight--;
2315 cli->cl_close_rpcs_in_flight--;
2316 /* release the tag in the bitmap */
2317 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2318 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2319 spin_unlock(&cli->cl_mod_rpcs_lock);
2320 wake_up(&cli->cl_mod_rpcs_waitq);
2322 EXPORT_SYMBOL(obd_put_mod_rpc_slot);