4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59 const char *status, int locks, int debug_level);
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90 obd, obd->obd_namespace, obd->obd_force);
93 lu_ref_fini(&obd->obd_reference);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 struct obd_type *class_search_type(const char *name)
99 struct kobject *kobj = kset_find_obj(lustre_kset, name);
101 if (kobj && kobj->ktype == &class_ktype)
102 return container_of(kobj, struct obd_type, typ_kobj);
107 EXPORT_SYMBOL(class_search_type);
109 struct obd_type *class_get_type(const char *name)
111 struct obd_type *type;
113 type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
116 const char *modname = name;
118 #ifdef HAVE_SERVER_SUPPORT
119 if (strcmp(modname, "obdfilter") == 0)
122 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123 modname = LUSTRE_OSP_NAME;
125 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126 modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 if (try_module_get(type->typ_dt_ops->o_owner)) {
140 atomic_inc(&type->typ_refcnt);
141 /* class_search_type() returned a counted reference,
142 * but we don't need that count any more as
143 * we have one through typ_refcnt.
145 kobject_put(&type->typ_kobj);
147 kobject_put(&type->typ_kobj);
154 void class_put_type(struct obd_type *type)
157 module_put(type->typ_dt_ops->o_owner);
158 atomic_dec(&type->typ_refcnt);
161 static void class_sysfs_release(struct kobject *kobj)
163 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
165 debugfs_remove_recursive(type->typ_debugfs_entry);
166 type->typ_debugfs_entry = NULL;
169 lu_device_type_fini(type->typ_lu);
171 #ifdef CONFIG_PROC_FS
172 if (type->typ_name && type->typ_procroot)
173 remove_proc_subtree(type->typ_name, proc_lustre_root);
175 OBD_FREE(type, sizeof(*type));
178 static struct kobj_type class_ktype = {
179 .sysfs_ops = &lustre_sysfs_ops,
180 .release = class_sysfs_release,
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
186 struct dentry *symlink;
187 struct obd_type *type;
190 type = class_search_type(name);
192 kobject_put(&type->typ_kobj);
193 return ERR_PTR(-EEXIST);
196 OBD_ALLOC(type, sizeof(*type));
198 return ERR_PTR(-ENOMEM);
200 type->typ_kobj.kset = lustre_kset;
201 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202 &lustre_kset->kobj, "%s", name);
206 symlink = debugfs_create_dir(name, debugfs_lustre_root);
207 type->typ_debugfs_entry = symlink;
208 type->typ_sym_filter = true;
211 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
213 if (IS_ERR(type->typ_procroot)) {
214 CERROR("%s: can't create compat proc entry: %d\n",
215 name, (int)PTR_ERR(type->typ_procroot));
216 type->typ_procroot = NULL;
222 EXPORT_SYMBOL(class_add_symlinks);
223 #endif /* HAVE_SERVER_SUPPORT */
225 #define CLASS_MAX_NAME 1024
227 int class_register_type(const struct obd_ops *dt_ops,
228 const struct md_ops *md_ops,
229 bool enable_proc, struct lprocfs_vars *vars,
230 const char *name, struct lu_device_type *ldt)
232 struct obd_type *type;
237 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
239 type = class_search_type(name);
241 #ifdef HAVE_SERVER_SUPPORT
242 if (type->typ_sym_filter)
244 #endif /* HAVE_SERVER_SUPPORT */
245 kobject_put(&type->typ_kobj);
246 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
250 OBD_ALLOC(type, sizeof(*type));
254 type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
255 type->typ_kobj.kset = lustre_kset;
256 kobject_init(&type->typ_kobj, &class_ktype);
257 #ifdef HAVE_SERVER_SUPPORT
259 #endif /* HAVE_SERVER_SUPPORT */
261 type->typ_dt_ops = dt_ops;
262 type->typ_md_ops = md_ops;
264 #ifdef HAVE_SERVER_SUPPORT
265 if (type->typ_sym_filter) {
266 type->typ_sym_filter = false;
267 kobject_put(&type->typ_kobj);
271 #ifdef CONFIG_PROC_FS
272 if (enable_proc && !type->typ_procroot) {
273 type->typ_procroot = lprocfs_register(name,
276 if (IS_ERR(type->typ_procroot)) {
277 rc = PTR_ERR(type->typ_procroot);
278 type->typ_procroot = NULL;
283 type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
284 ldebugfs_add_vars(type->typ_debugfs_entry, vars, type);
286 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
289 #ifdef HAVE_SERVER_SUPPORT
293 rc = lu_device_type_init(ldt);
294 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
295 wake_up_var(&type->typ_lu);
303 kobject_put(&type->typ_kobj);
307 EXPORT_SYMBOL(class_register_type);
309 int class_unregister_type(const char *name)
311 struct obd_type *type = class_search_type(name);
316 CERROR("unknown obd type\n");
320 if (atomic_read(&type->typ_refcnt)) {
321 CERROR("type %s has refcount (%d)\n", name,
322 atomic_read(&type->typ_refcnt));
323 /* This is a bad situation, let's make the best of it */
324 /* Remove ops, but leave the name for debugging */
325 type->typ_dt_ops = NULL;
326 type->typ_md_ops = NULL;
327 GOTO(out_put, rc = -EBUSY);
330 /* Put the final ref */
331 kobject_put(&type->typ_kobj);
333 /* Put the ref returned by class_search_type() */
334 kobject_put(&type->typ_kobj);
337 } /* class_unregister_type */
338 EXPORT_SYMBOL(class_unregister_type);
341 * Create a new obd device.
343 * Allocate the new obd_device and initialize it.
345 * \param[in] type_name obd device type string.
346 * \param[in] name obd device name.
347 * \param[in] uuid obd device UUID
349 * \retval newdev pointer to created obd_device
350 * \retval ERR_PTR(errno) on error
352 struct obd_device *class_newdev(const char *type_name, const char *name,
355 struct obd_device *newdev;
356 struct obd_type *type = NULL;
359 if (strlen(name) >= MAX_OBD_NAME) {
360 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
361 RETURN(ERR_PTR(-EINVAL));
364 type = class_get_type(type_name);
366 CERROR("OBD: unknown type: %s\n", type_name);
367 RETURN(ERR_PTR(-ENODEV));
370 newdev = obd_device_alloc();
371 if (newdev == NULL) {
372 class_put_type(type);
373 RETURN(ERR_PTR(-ENOMEM));
375 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
376 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
377 newdev->obd_type = type;
378 newdev->obd_minor = -1;
380 rwlock_init(&newdev->obd_pool_lock);
381 newdev->obd_pool_limit = 0;
382 newdev->obd_pool_slv = 0;
384 INIT_LIST_HEAD(&newdev->obd_exports);
385 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
386 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
387 INIT_LIST_HEAD(&newdev->obd_exports_timed);
388 INIT_LIST_HEAD(&newdev->obd_nid_stats);
389 spin_lock_init(&newdev->obd_nid_lock);
390 spin_lock_init(&newdev->obd_dev_lock);
391 mutex_init(&newdev->obd_dev_mutex);
392 spin_lock_init(&newdev->obd_osfs_lock);
393 /* newdev->obd_osfs_age must be set to a value in the distant
394 * past to guarantee a fresh statfs is fetched on mount. */
395 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
397 /* XXX belongs in setup not attach */
398 init_rwsem(&newdev->obd_observer_link_sem);
400 spin_lock_init(&newdev->obd_recovery_task_lock);
401 init_waitqueue_head(&newdev->obd_next_transno_waitq);
402 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
403 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
404 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
405 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
406 INIT_LIST_HEAD(&newdev->obd_evict_list);
407 INIT_LIST_HEAD(&newdev->obd_lwp_list);
409 llog_group_init(&newdev->obd_olg);
410 /* Detach drops this */
411 atomic_set(&newdev->obd_refcount, 1);
412 lu_ref_init(&newdev->obd_reference);
413 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
415 newdev->obd_conn_inprogress = 0;
417 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
419 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
420 newdev->obd_name, newdev);
428 * \param[in] obd obd_device to be freed
432 void class_free_dev(struct obd_device *obd)
434 struct obd_type *obd_type = obd->obd_type;
436 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
437 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
438 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
439 "obd %p != obd_devs[%d] %p\n",
440 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
441 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
442 "obd_refcount should be 0, not %d\n",
443 atomic_read(&obd->obd_refcount));
444 LASSERT(obd_type != NULL);
446 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
447 obd->obd_name, obd->obd_type->typ_name);
449 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
450 obd->obd_name, obd->obd_uuid.uuid);
451 if (obd->obd_stopping) {
454 /* If we're not stopping, we were never set up */
455 err = obd_cleanup(obd);
457 CERROR("Cleanup %s returned %d\n",
461 obd_device_free(obd);
463 class_put_type(obd_type);
467 * Unregister obd device.
469 * Free slot in obd_dev[] used by \a obd.
471 * \param[in] new_obd obd_device to be unregistered
475 void class_unregister_device(struct obd_device *obd)
477 write_lock(&obd_dev_lock);
478 if (obd->obd_minor >= 0) {
479 LASSERT(obd_devs[obd->obd_minor] == obd);
480 obd_devs[obd->obd_minor] = NULL;
483 write_unlock(&obd_dev_lock);
487 * Register obd device.
489 * Find free slot in obd_devs[], fills it with \a new_obd.
491 * \param[in] new_obd obd_device to be registered
494 * \retval -EEXIST device with this name is registered
495 * \retval -EOVERFLOW obd_devs[] is full
497 int class_register_device(struct obd_device *new_obd)
501 int new_obd_minor = 0;
502 bool minor_assign = false;
503 bool retried = false;
506 write_lock(&obd_dev_lock);
507 for (i = 0; i < class_devno_max(); i++) {
508 struct obd_device *obd = class_num2obd(i);
511 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
514 write_unlock(&obd_dev_lock);
516 /* the obd_device could be waited to be
517 * destroyed by the "obd_zombie_impexp_thread".
519 obd_zombie_barrier();
524 CERROR("%s: already exists, won't add\n",
526 /* in case we found a free slot before duplicate */
527 minor_assign = false;
531 if (!minor_assign && obd == NULL) {
538 new_obd->obd_minor = new_obd_minor;
539 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
540 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
541 obd_devs[new_obd_minor] = new_obd;
545 CERROR("%s: all %u/%u devices used, increase "
546 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
547 i, class_devno_max(), ret);
550 write_unlock(&obd_dev_lock);
555 static int class_name2dev_nolock(const char *name)
562 for (i = 0; i < class_devno_max(); i++) {
563 struct obd_device *obd = class_num2obd(i);
565 if (obd && strcmp(name, obd->obd_name) == 0) {
566 /* Make sure we finished attaching before we give
567 out any references */
568 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
569 if (obd->obd_attached) {
579 int class_name2dev(const char *name)
586 read_lock(&obd_dev_lock);
587 i = class_name2dev_nolock(name);
588 read_unlock(&obd_dev_lock);
592 EXPORT_SYMBOL(class_name2dev);
594 struct obd_device *class_name2obd(const char *name)
596 int dev = class_name2dev(name);
598 if (dev < 0 || dev > class_devno_max())
600 return class_num2obd(dev);
602 EXPORT_SYMBOL(class_name2obd);
604 int class_uuid2dev_nolock(struct obd_uuid *uuid)
608 for (i = 0; i < class_devno_max(); i++) {
609 struct obd_device *obd = class_num2obd(i);
611 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
612 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
620 int class_uuid2dev(struct obd_uuid *uuid)
624 read_lock(&obd_dev_lock);
625 i = class_uuid2dev_nolock(uuid);
626 read_unlock(&obd_dev_lock);
630 EXPORT_SYMBOL(class_uuid2dev);
632 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
634 int dev = class_uuid2dev(uuid);
637 return class_num2obd(dev);
639 EXPORT_SYMBOL(class_uuid2obd);
642 * Get obd device from ::obd_devs[]
644 * \param num [in] array index
646 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
647 * otherwise return the obd device there.
649 struct obd_device *class_num2obd(int num)
651 struct obd_device *obd = NULL;
653 if (num < class_devno_max()) {
658 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
659 "%p obd_magic %08x != %08x\n",
660 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
661 LASSERTF(obd->obd_minor == num,
662 "%p obd_minor %0d != %0d\n",
663 obd, obd->obd_minor, num);
668 EXPORT_SYMBOL(class_num2obd);
671 * Find obd in obd_dev[] by name or uuid.
673 * Increment obd's refcount if found.
675 * \param[in] str obd name or uuid
677 * \retval NULL if not found
678 * \retval target pointer to found obd_device
680 struct obd_device *class_dev_by_str(const char *str)
682 struct obd_device *target = NULL;
683 struct obd_uuid tgtuuid;
686 obd_str2uuid(&tgtuuid, str);
688 read_lock(&obd_dev_lock);
689 rc = class_uuid2dev_nolock(&tgtuuid);
691 rc = class_name2dev_nolock(str);
694 target = class_num2obd(rc);
697 class_incref(target, "find", current);
698 read_unlock(&obd_dev_lock);
702 EXPORT_SYMBOL(class_dev_by_str);
705 * Get obd devices count. Device in any
707 * \retval obd device count
709 int get_devices_count(void)
711 int index, max_index = class_devno_max(), dev_count = 0;
713 read_lock(&obd_dev_lock);
714 for (index = 0; index <= max_index; index++) {
715 struct obd_device *obd = class_num2obd(index);
719 read_unlock(&obd_dev_lock);
723 EXPORT_SYMBOL(get_devices_count);
725 void class_obd_list(void)
730 read_lock(&obd_dev_lock);
731 for (i = 0; i < class_devno_max(); i++) {
732 struct obd_device *obd = class_num2obd(i);
736 if (obd->obd_stopping)
738 else if (obd->obd_set_up)
740 else if (obd->obd_attached)
744 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
745 i, status, obd->obd_type->typ_name,
746 obd->obd_name, obd->obd_uuid.uuid,
747 atomic_read(&obd->obd_refcount));
749 read_unlock(&obd_dev_lock);
752 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
753 * specified, then only the client with that uuid is returned,
754 * otherwise any client connected to the tgt is returned.
756 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
757 const char *type_name,
758 struct obd_uuid *grp_uuid)
762 read_lock(&obd_dev_lock);
763 for (i = 0; i < class_devno_max(); i++) {
764 struct obd_device *obd = class_num2obd(i);
768 if ((strncmp(obd->obd_type->typ_name, type_name,
769 strlen(type_name)) == 0)) {
770 if (obd_uuid_equals(tgt_uuid,
771 &obd->u.cli.cl_target_uuid) &&
772 ((grp_uuid)? obd_uuid_equals(grp_uuid,
773 &obd->obd_uuid) : 1)) {
774 read_unlock(&obd_dev_lock);
779 read_unlock(&obd_dev_lock);
783 EXPORT_SYMBOL(class_find_client_obd);
785 /* Iterate the obd_device list looking devices have grp_uuid. Start
786 * searching at *next, and if a device is found, the next index to look
787 * at is saved in *next. If next is NULL, then the first matching device
788 * will always be returned.
790 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
796 else if (*next >= 0 && *next < class_devno_max())
801 read_lock(&obd_dev_lock);
802 for (; i < class_devno_max(); i++) {
803 struct obd_device *obd = class_num2obd(i);
807 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
810 read_unlock(&obd_dev_lock);
814 read_unlock(&obd_dev_lock);
818 EXPORT_SYMBOL(class_devices_in_group);
821 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
822 * adjust sptlrpc settings accordingly.
824 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
826 struct obd_device *obd;
830 LASSERT(namelen > 0);
832 read_lock(&obd_dev_lock);
833 for (i = 0; i < class_devno_max(); i++) {
834 obd = class_num2obd(i);
836 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
839 /* only notify mdc, osc, osp, lwp, mdt, ost
840 * because only these have a -sptlrpc llog */
841 type = obd->obd_type->typ_name;
842 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
843 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
844 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
845 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
846 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
847 strcmp(type, LUSTRE_OST_NAME) != 0)
850 if (strncmp(obd->obd_name, fsname, namelen))
853 class_incref(obd, __FUNCTION__, obd);
854 read_unlock(&obd_dev_lock);
855 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
856 sizeof(KEY_SPTLRPC_CONF),
857 KEY_SPTLRPC_CONF, 0, NULL, NULL);
859 class_decref(obd, __FUNCTION__, obd);
860 read_lock(&obd_dev_lock);
862 read_unlock(&obd_dev_lock);
865 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
867 void obd_cleanup_caches(void)
870 if (obd_device_cachep) {
871 kmem_cache_destroy(obd_device_cachep);
872 obd_device_cachep = NULL;
878 int obd_init_caches(void)
883 LASSERT(obd_device_cachep == NULL);
884 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
885 sizeof(struct obd_device),
886 0, 0, 0, sizeof(struct obd_device), NULL);
887 if (!obd_device_cachep)
888 GOTO(out, rc = -ENOMEM);
892 obd_cleanup_caches();
896 static const char export_handle_owner[] = "export";
898 /* map connection to client */
899 struct obd_export *class_conn2export(struct lustre_handle *conn)
901 struct obd_export *export;
905 CDEBUG(D_CACHE, "looking for null handle\n");
909 if (conn->cookie == -1) { /* this means assign a new connection */
910 CDEBUG(D_CACHE, "want a new connection\n");
914 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
915 export = class_handle2object(conn->cookie, export_handle_owner);
918 EXPORT_SYMBOL(class_conn2export);
920 struct obd_device *class_exp2obd(struct obd_export *exp)
926 EXPORT_SYMBOL(class_exp2obd);
928 struct obd_import *class_exp2cliimp(struct obd_export *exp)
930 struct obd_device *obd = exp->exp_obd;
933 return obd->u.cli.cl_import;
935 EXPORT_SYMBOL(class_exp2cliimp);
937 /* Export management functions */
938 static void class_export_destroy(struct obd_export *exp)
940 struct obd_device *obd = exp->exp_obd;
943 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
944 LASSERT(obd != NULL);
946 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
947 exp->exp_client_uuid.uuid, obd->obd_name);
949 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
950 if (exp->exp_connection)
951 ptlrpc_put_connection_superhack(exp->exp_connection);
953 LASSERT(list_empty(&exp->exp_outstanding_replies));
954 LASSERT(list_empty(&exp->exp_uncommitted_replies));
955 LASSERT(list_empty(&exp->exp_req_replay_queue));
956 LASSERT(list_empty(&exp->exp_hp_rpcs));
957 obd_destroy_export(exp);
958 /* self export doesn't hold a reference to an obd, although it
959 * exists until freeing of the obd */
960 if (exp != obd->obd_self_export)
961 class_decref(obd, "export", exp);
963 OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
964 kfree_rcu(exp, exp_handle.h_rcu);
968 struct obd_export *class_export_get(struct obd_export *exp)
970 refcount_inc(&exp->exp_handle.h_ref);
971 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
972 refcount_read(&exp->exp_handle.h_ref));
975 EXPORT_SYMBOL(class_export_get);
977 void class_export_put(struct obd_export *exp)
979 LASSERT(exp != NULL);
980 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
981 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
982 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
983 refcount_read(&exp->exp_handle.h_ref) - 1);
985 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
986 struct obd_device *obd = exp->exp_obd;
988 CDEBUG(D_IOCTL, "final put %p/%s\n",
989 exp, exp->exp_client_uuid.uuid);
991 /* release nid stat refererence */
992 lprocfs_exp_cleanup(exp);
994 if (exp == obd->obd_self_export) {
995 /* self export should be destroyed without
996 * zombie thread as it doesn't hold a
997 * reference to obd and doesn't hold any
999 class_export_destroy(exp);
1000 /* self export is destroyed, no class
1001 * references exist and it is safe to free
1003 class_free_dev(obd);
1005 LASSERT(!list_empty(&exp->exp_obd_chain));
1006 obd_zombie_export_add(exp);
1011 EXPORT_SYMBOL(class_export_put);
1013 static void obd_zombie_exp_cull(struct work_struct *ws)
1015 struct obd_export *export;
1017 export = container_of(ws, struct obd_export, exp_zombie_work);
1018 class_export_destroy(export);
1021 /* Creates a new export, adds it to the hash table, and returns a
1022 * pointer to it. The refcount is 2: one for the hash reference, and
1023 * one for the pointer returned by this function. */
1024 struct obd_export *__class_new_export(struct obd_device *obd,
1025 struct obd_uuid *cluuid, bool is_self)
1027 struct obd_export *export;
1031 OBD_ALLOC_PTR(export);
1033 return ERR_PTR(-ENOMEM);
1035 export->exp_conn_cnt = 0;
1036 export->exp_lock_hash = NULL;
1037 export->exp_flock_hash = NULL;
1038 /* 2 = class_handle_hash + last */
1039 refcount_set(&export->exp_handle.h_ref, 2);
1040 atomic_set(&export->exp_rpc_count, 0);
1041 atomic_set(&export->exp_cb_count, 0);
1042 atomic_set(&export->exp_locks_count, 0);
1043 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1044 INIT_LIST_HEAD(&export->exp_locks_list);
1045 spin_lock_init(&export->exp_locks_list_guard);
1047 atomic_set(&export->exp_replay_count, 0);
1048 export->exp_obd = obd;
1049 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1050 spin_lock_init(&export->exp_uncommitted_replies_lock);
1051 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1052 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1053 INIT_HLIST_NODE(&export->exp_handle.h_link);
1054 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1055 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1056 class_handle_hash(&export->exp_handle, export_handle_owner);
1057 export->exp_last_request_time = ktime_get_real_seconds();
1058 spin_lock_init(&export->exp_lock);
1059 spin_lock_init(&export->exp_rpc_lock);
1060 INIT_HLIST_NODE(&export->exp_nid_hash);
1061 INIT_HLIST_NODE(&export->exp_gen_hash);
1062 spin_lock_init(&export->exp_bl_list_lock);
1063 INIT_LIST_HEAD(&export->exp_bl_list);
1064 INIT_LIST_HEAD(&export->exp_stale_list);
1065 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1067 export->exp_sp_peer = LUSTRE_SP_ANY;
1068 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1069 export->exp_client_uuid = *cluuid;
1070 obd_init_export(export);
1072 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1074 spin_lock(&obd->obd_dev_lock);
1075 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1076 /* shouldn't happen, but might race */
1077 if (obd->obd_stopping)
1078 GOTO(exit_unlock, rc = -ENODEV);
1080 rc = obd_uuid_add(obd, export);
1082 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1083 obd->obd_name, cluuid->uuid, rc);
1084 GOTO(exit_unlock, rc = -EALREADY);
1089 class_incref(obd, "export", export);
1090 list_add_tail(&export->exp_obd_chain_timed,
1091 &obd->obd_exports_timed);
1092 list_add(&export->exp_obd_chain, &obd->obd_exports);
1093 obd->obd_num_exports++;
1095 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1096 INIT_LIST_HEAD(&export->exp_obd_chain);
1098 spin_unlock(&obd->obd_dev_lock);
1102 spin_unlock(&obd->obd_dev_lock);
1103 class_handle_unhash(&export->exp_handle);
1104 obd_destroy_export(export);
1105 OBD_FREE_PTR(export);
1109 struct obd_export *class_new_export(struct obd_device *obd,
1110 struct obd_uuid *uuid)
1112 return __class_new_export(obd, uuid, false);
1114 EXPORT_SYMBOL(class_new_export);
1116 struct obd_export *class_new_export_self(struct obd_device *obd,
1117 struct obd_uuid *uuid)
1119 return __class_new_export(obd, uuid, true);
1122 void class_unlink_export(struct obd_export *exp)
1124 class_handle_unhash(&exp->exp_handle);
1126 if (exp->exp_obd->obd_self_export == exp) {
1127 class_export_put(exp);
1131 spin_lock(&exp->exp_obd->obd_dev_lock);
1132 /* delete an uuid-export hashitem from hashtables */
1133 if (exp != exp->exp_obd->obd_self_export)
1134 obd_uuid_del(exp->exp_obd, exp);
1136 #ifdef HAVE_SERVER_SUPPORT
1137 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1138 struct tg_export_data *ted = &exp->exp_target_data;
1139 struct cfs_hash *hash;
1141 /* Because obd_gen_hash will not be released until
1142 * class_cleanup(), so hash should never be NULL here */
1143 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1144 LASSERT(hash != NULL);
1145 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1146 &exp->exp_gen_hash);
1147 cfs_hash_putref(hash);
1149 #endif /* HAVE_SERVER_SUPPORT */
1151 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1152 list_del_init(&exp->exp_obd_chain_timed);
1153 exp->exp_obd->obd_num_exports--;
1154 spin_unlock(&exp->exp_obd->obd_dev_lock);
1155 atomic_inc(&obd_stale_export_num);
1157 /* A reference is kept by obd_stale_exports list */
1158 obd_stale_export_put(exp);
1160 EXPORT_SYMBOL(class_unlink_export);
1162 /* Import management functions */
1163 static void obd_zombie_import_free(struct obd_import *imp)
1167 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1168 imp->imp_obd->obd_name);
1170 LASSERT(refcount_read(&imp->imp_refcount) == 0);
1172 ptlrpc_put_connection_superhack(imp->imp_connection);
1174 while (!list_empty(&imp->imp_conn_list)) {
1175 struct obd_import_conn *imp_conn;
1177 imp_conn = list_first_entry(&imp->imp_conn_list,
1178 struct obd_import_conn, oic_item);
1179 list_del_init(&imp_conn->oic_item);
1180 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1181 OBD_FREE(imp_conn, sizeof(*imp_conn));
1184 LASSERT(imp->imp_sec == NULL);
1185 LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1186 imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1187 class_decref(imp->imp_obd, "import", imp);
1192 struct obd_import *class_import_get(struct obd_import *import)
1194 refcount_inc(&import->imp_refcount);
1195 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1196 refcount_read(&import->imp_refcount),
1197 import->imp_obd->obd_name);
1200 EXPORT_SYMBOL(class_import_get);
1202 void class_import_put(struct obd_import *imp)
1206 LASSERT(refcount_read(&imp->imp_refcount) > 0);
1208 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1209 refcount_read(&imp->imp_refcount) - 1,
1210 imp->imp_obd->obd_name);
1212 if (refcount_dec_and_test(&imp->imp_refcount)) {
1213 CDEBUG(D_INFO, "final put import %p\n", imp);
1214 obd_zombie_import_add(imp);
1219 EXPORT_SYMBOL(class_import_put);
1221 static void init_imp_at(struct imp_at *at) {
1223 at_init(&at->iat_net_latency, 0, 0);
1224 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1225 /* max service estimates are tracked on the server side, so
1226 don't use the AT history here, just use the last reported
1227 val. (But keep hist for proc histogram, worst_ever) */
1228 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1233 static void obd_zombie_imp_cull(struct work_struct *ws)
1235 struct obd_import *import;
1237 import = container_of(ws, struct obd_import, imp_zombie_work);
1238 obd_zombie_import_free(import);
1241 struct obd_import *class_new_import(struct obd_device *obd)
1243 struct obd_import *imp;
1244 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1246 OBD_ALLOC(imp, sizeof(*imp));
1250 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1251 INIT_LIST_HEAD(&imp->imp_replay_list);
1252 INIT_LIST_HEAD(&imp->imp_sending_list);
1253 INIT_LIST_HEAD(&imp->imp_delayed_list);
1254 INIT_LIST_HEAD(&imp->imp_committed_list);
1255 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1256 imp->imp_known_replied_xid = 0;
1257 imp->imp_replay_cursor = &imp->imp_committed_list;
1258 spin_lock_init(&imp->imp_lock);
1259 imp->imp_last_success_conn = 0;
1260 imp->imp_state = LUSTRE_IMP_NEW;
1261 imp->imp_obd = class_incref(obd, "import", imp);
1262 rwlock_init(&imp->imp_sec_lock);
1263 init_waitqueue_head(&imp->imp_recovery_waitq);
1264 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1266 if (curr_pid_ns && curr_pid_ns->child_reaper)
1267 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1269 imp->imp_sec_refpid = 1;
1271 refcount_set(&imp->imp_refcount, 2);
1272 atomic_set(&imp->imp_unregistering, 0);
1273 atomic_set(&imp->imp_reqs, 0);
1274 atomic_set(&imp->imp_inflight, 0);
1275 atomic_set(&imp->imp_replay_inflight, 0);
1276 atomic_set(&imp->imp_inval_count, 0);
1277 INIT_LIST_HEAD(&imp->imp_conn_list);
1278 init_imp_at(&imp->imp_at);
1280 /* the default magic is V2, will be used in connect RPC, and
1281 * then adjusted according to the flags in request/reply. */
1282 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1286 EXPORT_SYMBOL(class_new_import);
1288 void class_destroy_import(struct obd_import *import)
1290 LASSERT(import != NULL);
1291 LASSERT(import != LP_POISON);
1293 spin_lock(&import->imp_lock);
1294 import->imp_generation++;
1295 spin_unlock(&import->imp_lock);
1296 class_import_put(import);
1298 EXPORT_SYMBOL(class_destroy_import);
1300 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1302 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1304 spin_lock(&exp->exp_locks_list_guard);
1306 LASSERT(lock->l_exp_refs_nr >= 0);
1308 if (lock->l_exp_refs_target != NULL &&
1309 lock->l_exp_refs_target != exp) {
1310 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1311 exp, lock, lock->l_exp_refs_target);
1313 if ((lock->l_exp_refs_nr ++) == 0) {
1314 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1315 lock->l_exp_refs_target = exp;
1317 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1318 lock, exp, lock->l_exp_refs_nr);
1319 spin_unlock(&exp->exp_locks_list_guard);
1321 EXPORT_SYMBOL(__class_export_add_lock_ref);
1323 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1325 spin_lock(&exp->exp_locks_list_guard);
1326 LASSERT(lock->l_exp_refs_nr > 0);
1327 if (lock->l_exp_refs_target != exp) {
1328 LCONSOLE_WARN("lock %p, "
1329 "mismatching export pointers: %p, %p\n",
1330 lock, lock->l_exp_refs_target, exp);
1332 if (-- lock->l_exp_refs_nr == 0) {
1333 list_del_init(&lock->l_exp_refs_link);
1334 lock->l_exp_refs_target = NULL;
1336 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1337 lock, exp, lock->l_exp_refs_nr);
1338 spin_unlock(&exp->exp_locks_list_guard);
1340 EXPORT_SYMBOL(__class_export_del_lock_ref);
1343 /* A connection defines an export context in which preallocation can
1344 be managed. This releases the export pointer reference, and returns
1345 the export handle, so the export refcount is 1 when this function
1347 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1348 struct obd_uuid *cluuid)
1350 struct obd_export *export;
1351 LASSERT(conn != NULL);
1352 LASSERT(obd != NULL);
1353 LASSERT(cluuid != NULL);
1356 export = class_new_export(obd, cluuid);
1358 RETURN(PTR_ERR(export));
1360 conn->cookie = export->exp_handle.h_cookie;
1361 class_export_put(export);
1363 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1364 cluuid->uuid, conn->cookie);
1367 EXPORT_SYMBOL(class_connect);
1369 /* if export is involved in recovery then clean up related things */
1370 static void class_export_recovery_cleanup(struct obd_export *exp)
1372 struct obd_device *obd = exp->exp_obd;
1374 spin_lock(&obd->obd_recovery_task_lock);
1375 if (obd->obd_recovering) {
1376 if (exp->exp_in_recovery) {
1377 spin_lock(&exp->exp_lock);
1378 exp->exp_in_recovery = 0;
1379 spin_unlock(&exp->exp_lock);
1380 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1381 atomic_dec(&obd->obd_connected_clients);
1384 /* if called during recovery then should update
1385 * obd_stale_clients counter,
1386 * lightweight exports are not counted */
1387 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1388 exp->exp_obd->obd_stale_clients++;
1390 spin_unlock(&obd->obd_recovery_task_lock);
1392 spin_lock(&exp->exp_lock);
1393 /** Cleanup req replay fields */
1394 if (exp->exp_req_replay_needed) {
1395 exp->exp_req_replay_needed = 0;
1397 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1398 atomic_dec(&obd->obd_req_replay_clients);
1401 /** Cleanup lock replay data */
1402 if (exp->exp_lock_replay_needed) {
1403 exp->exp_lock_replay_needed = 0;
1405 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1406 atomic_dec(&obd->obd_lock_replay_clients);
1408 spin_unlock(&exp->exp_lock);
1411 /* This function removes 1-3 references from the export:
1412 * 1 - for export pointer passed
1413 * and if disconnect really need
1414 * 2 - removing from hash
1415 * 3 - in client_unlink_export
1416 * The export pointer passed to this function can destroyed */
1417 int class_disconnect(struct obd_export *export)
1419 int already_disconnected;
1422 if (export == NULL) {
1423 CWARN("attempting to free NULL export %p\n", export);
1427 spin_lock(&export->exp_lock);
1428 already_disconnected = export->exp_disconnected;
1429 export->exp_disconnected = 1;
1430 /* We hold references of export for uuid hash
1431 * and nid_hash and export link at least. So
1432 * it is safe to call cfs_hash_del in there. */
1433 if (!hlist_unhashed(&export->exp_nid_hash))
1434 cfs_hash_del(export->exp_obd->obd_nid_hash,
1435 &export->exp_connection->c_peer.nid,
1436 &export->exp_nid_hash);
1437 spin_unlock(&export->exp_lock);
1439 /* class_cleanup(), abort_recovery(), and class_fail_export()
1440 * all end up in here, and if any of them race we shouldn't
1441 * call extra class_export_puts(). */
1442 if (already_disconnected) {
1443 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1444 GOTO(no_disconn, already_disconnected);
1447 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1448 export->exp_handle.h_cookie);
1450 class_export_recovery_cleanup(export);
1451 class_unlink_export(export);
1453 class_export_put(export);
1456 EXPORT_SYMBOL(class_disconnect);
1458 /* Return non-zero for a fully connected export */
1459 int class_connected_export(struct obd_export *exp)
1464 spin_lock(&exp->exp_lock);
1465 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1466 spin_unlock(&exp->exp_lock);
1470 EXPORT_SYMBOL(class_connected_export);
1472 static void class_disconnect_export_list(struct list_head *list,
1473 enum obd_option flags)
1476 struct obd_export *exp;
1479 /* It's possible that an export may disconnect itself, but
1480 * nothing else will be added to this list. */
1481 while (!list_empty(list)) {
1482 exp = list_first_entry(list, struct obd_export,
1484 /* need for safe call CDEBUG after obd_disconnect */
1485 class_export_get(exp);
1487 spin_lock(&exp->exp_lock);
1488 exp->exp_flags = flags;
1489 spin_unlock(&exp->exp_lock);
1491 if (obd_uuid_equals(&exp->exp_client_uuid,
1492 &exp->exp_obd->obd_uuid)) {
1494 "exp %p export uuid == obd uuid, don't discon\n",
1496 /* Need to delete this now so we don't end up pointing
1497 * to work_list later when this export is cleaned up. */
1498 list_del_init(&exp->exp_obd_chain);
1499 class_export_put(exp);
1503 class_export_get(exp);
1504 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1505 "last request at %lld\n",
1506 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1507 exp, exp->exp_last_request_time);
1508 /* release one export reference anyway */
1509 rc = obd_disconnect(exp);
1511 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1512 obd_export_nid2str(exp), exp, rc);
1513 class_export_put(exp);
1518 void class_disconnect_exports(struct obd_device *obd)
1520 LIST_HEAD(work_list);
1523 /* Move all of the exports from obd_exports to a work list, en masse. */
1524 spin_lock(&obd->obd_dev_lock);
1525 list_splice_init(&obd->obd_exports, &work_list);
1526 list_splice_init(&obd->obd_delayed_exports, &work_list);
1527 spin_unlock(&obd->obd_dev_lock);
1529 if (!list_empty(&work_list)) {
1530 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1531 "disconnecting them\n", obd->obd_minor, obd);
1532 class_disconnect_export_list(&work_list,
1533 exp_flags_from_obd(obd));
1535 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1536 obd->obd_minor, obd);
1539 EXPORT_SYMBOL(class_disconnect_exports);
1541 /* Remove exports that have not completed recovery.
1543 void class_disconnect_stale_exports(struct obd_device *obd,
1544 int (*test_export)(struct obd_export *))
1546 LIST_HEAD(work_list);
1547 struct obd_export *exp, *n;
1551 spin_lock(&obd->obd_dev_lock);
1552 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1554 /* don't count self-export as client */
1555 if (obd_uuid_equals(&exp->exp_client_uuid,
1556 &exp->exp_obd->obd_uuid))
1559 /* don't evict clients which have no slot in last_rcvd
1560 * (e.g. lightweight connection) */
1561 if (exp->exp_target_data.ted_lr_idx == -1)
1564 spin_lock(&exp->exp_lock);
1565 if (exp->exp_failed || test_export(exp)) {
1566 spin_unlock(&exp->exp_lock);
1569 exp->exp_failed = 1;
1570 spin_unlock(&exp->exp_lock);
1572 list_move(&exp->exp_obd_chain, &work_list);
1574 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1575 obd->obd_name, exp->exp_client_uuid.uuid,
1576 obd_export_nid2str(exp));
1577 print_export_data(exp, "EVICTING", 0, D_HA);
1579 spin_unlock(&obd->obd_dev_lock);
1582 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1583 obd->obd_name, evicted);
1585 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1586 OBD_OPT_ABORT_RECOV);
1589 EXPORT_SYMBOL(class_disconnect_stale_exports);
1591 void class_fail_export(struct obd_export *exp)
1593 int rc, already_failed;
1595 spin_lock(&exp->exp_lock);
1596 already_failed = exp->exp_failed;
1597 exp->exp_failed = 1;
1598 spin_unlock(&exp->exp_lock);
1600 if (already_failed) {
1601 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1602 exp, exp->exp_client_uuid.uuid);
1606 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1607 exp, exp->exp_client_uuid.uuid);
1609 if (obd_dump_on_timeout)
1610 libcfs_debug_dumplog();
1612 /* need for safe call CDEBUG after obd_disconnect */
1613 class_export_get(exp);
1615 /* Most callers into obd_disconnect are removing their own reference
1616 * (request, for example) in addition to the one from the hash table.
1617 * We don't have such a reference here, so make one. */
1618 class_export_get(exp);
1619 rc = obd_disconnect(exp);
1621 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1623 CDEBUG(D_HA, "disconnected export %p/%s\n",
1624 exp, exp->exp_client_uuid.uuid);
1625 class_export_put(exp);
1627 EXPORT_SYMBOL(class_fail_export);
1629 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1631 struct cfs_hash *nid_hash;
1632 struct obd_export *doomed_exp = NULL;
1633 int exports_evicted = 0;
1635 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1637 spin_lock(&obd->obd_dev_lock);
1638 /* umount has run already, so evict thread should leave
1639 * its task to umount thread now */
1640 if (obd->obd_stopping) {
1641 spin_unlock(&obd->obd_dev_lock);
1642 return exports_evicted;
1644 nid_hash = obd->obd_nid_hash;
1645 cfs_hash_getref(nid_hash);
1646 spin_unlock(&obd->obd_dev_lock);
1649 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1650 if (doomed_exp == NULL)
1653 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1654 "nid %s found, wanted nid %s, requested nid %s\n",
1655 obd_export_nid2str(doomed_exp),
1656 libcfs_nid2str(nid_key), nid);
1657 LASSERTF(doomed_exp != obd->obd_self_export,
1658 "self-export is hashed by NID?\n");
1660 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1661 "request\n", obd->obd_name,
1662 obd_uuid2str(&doomed_exp->exp_client_uuid),
1663 obd_export_nid2str(doomed_exp));
1664 class_fail_export(doomed_exp);
1665 class_export_put(doomed_exp);
1668 cfs_hash_putref(nid_hash);
1670 if (!exports_evicted)
1671 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1672 obd->obd_name, nid);
1673 return exports_evicted;
1675 EXPORT_SYMBOL(obd_export_evict_by_nid);
1677 #ifdef HAVE_SERVER_SUPPORT
1678 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1680 struct obd_export *doomed_exp = NULL;
1681 struct obd_uuid doomed_uuid;
1682 int exports_evicted = 0;
1684 spin_lock(&obd->obd_dev_lock);
1685 if (obd->obd_stopping) {
1686 spin_unlock(&obd->obd_dev_lock);
1687 return exports_evicted;
1689 spin_unlock(&obd->obd_dev_lock);
1691 obd_str2uuid(&doomed_uuid, uuid);
1692 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1693 CERROR("%s: can't evict myself\n", obd->obd_name);
1694 return exports_evicted;
1697 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1698 if (doomed_exp == NULL) {
1699 CERROR("%s: can't disconnect %s: no exports found\n",
1700 obd->obd_name, uuid);
1702 CWARN("%s: evicting %s at adminstrative request\n",
1703 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1704 class_fail_export(doomed_exp);
1705 class_export_put(doomed_exp);
1706 obd_uuid_del(obd, doomed_exp);
1710 return exports_evicted;
1712 #endif /* HAVE_SERVER_SUPPORT */
1714 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1715 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1716 EXPORT_SYMBOL(class_export_dump_hook);
1719 static void print_export_data(struct obd_export *exp, const char *status,
1720 int locks, int debug_level)
1722 struct ptlrpc_reply_state *rs;
1723 struct ptlrpc_reply_state *first_reply = NULL;
1726 spin_lock(&exp->exp_lock);
1727 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1733 spin_unlock(&exp->exp_lock);
1735 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1736 "%p %s %llu stale:%d\n",
1737 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1738 obd_export_nid2str(exp),
1739 refcount_read(&exp->exp_handle.h_ref),
1740 atomic_read(&exp->exp_rpc_count),
1741 atomic_read(&exp->exp_cb_count),
1742 atomic_read(&exp->exp_locks_count),
1743 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1744 nreplies, first_reply, nreplies > 3 ? "..." : "",
1745 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1746 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1747 if (locks && class_export_dump_hook != NULL)
1748 class_export_dump_hook(exp);
1752 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1754 struct obd_export *exp;
1756 spin_lock(&obd->obd_dev_lock);
1757 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1758 print_export_data(exp, "ACTIVE", locks, debug_level);
1759 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1760 print_export_data(exp, "UNLINKED", locks, debug_level);
1761 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1762 print_export_data(exp, "DELAYED", locks, debug_level);
1763 spin_unlock(&obd->obd_dev_lock);
1766 void obd_exports_barrier(struct obd_device *obd)
1769 LASSERT(list_empty(&obd->obd_exports));
1770 spin_lock(&obd->obd_dev_lock);
1771 while (!list_empty(&obd->obd_unlinked_exports)) {
1772 spin_unlock(&obd->obd_dev_lock);
1773 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1774 if (waited > 5 && is_power_of_2(waited)) {
1775 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1776 "more than %d seconds. "
1777 "The obd refcount = %d. Is it stuck?\n",
1778 obd->obd_name, waited,
1779 atomic_read(&obd->obd_refcount));
1780 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1783 spin_lock(&obd->obd_dev_lock);
1785 spin_unlock(&obd->obd_dev_lock);
1787 EXPORT_SYMBOL(obd_exports_barrier);
1790 * Add export to the obd_zombe thread and notify it.
1792 static void obd_zombie_export_add(struct obd_export *exp) {
1793 atomic_dec(&obd_stale_export_num);
1794 spin_lock(&exp->exp_obd->obd_dev_lock);
1795 LASSERT(!list_empty(&exp->exp_obd_chain));
1796 list_del_init(&exp->exp_obd_chain);
1797 spin_unlock(&exp->exp_obd->obd_dev_lock);
1799 queue_work(zombie_wq, &exp->exp_zombie_work);
1803 * Add import to the obd_zombe thread and notify it.
1805 static void obd_zombie_import_add(struct obd_import *imp) {
1806 LASSERT(imp->imp_sec == NULL);
1808 queue_work(zombie_wq, &imp->imp_zombie_work);
1812 * wait when obd_zombie import/export queues become empty
1814 void obd_zombie_barrier(void)
1816 flush_workqueue(zombie_wq);
1818 EXPORT_SYMBOL(obd_zombie_barrier);
1821 struct obd_export *obd_stale_export_get(void)
1823 struct obd_export *exp = NULL;
1826 spin_lock(&obd_stale_export_lock);
1827 if (!list_empty(&obd_stale_exports)) {
1828 exp = list_first_entry(&obd_stale_exports,
1829 struct obd_export, exp_stale_list);
1830 list_del_init(&exp->exp_stale_list);
1832 spin_unlock(&obd_stale_export_lock);
1835 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1836 atomic_read(&obd_stale_export_num));
1840 EXPORT_SYMBOL(obd_stale_export_get);
1842 void obd_stale_export_put(struct obd_export *exp)
1846 LASSERT(list_empty(&exp->exp_stale_list));
1847 if (exp->exp_lock_hash &&
1848 atomic_read(&exp->exp_lock_hash->hs_count)) {
1849 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1850 atomic_read(&obd_stale_export_num));
1852 spin_lock_bh(&exp->exp_bl_list_lock);
1853 spin_lock(&obd_stale_export_lock);
1854 /* Add to the tail if there is no blocked locks,
1855 * to the head otherwise. */
1856 if (list_empty(&exp->exp_bl_list))
1857 list_add_tail(&exp->exp_stale_list,
1858 &obd_stale_exports);
1860 list_add(&exp->exp_stale_list,
1861 &obd_stale_exports);
1863 spin_unlock(&obd_stale_export_lock);
1864 spin_unlock_bh(&exp->exp_bl_list_lock);
1866 class_export_put(exp);
1870 EXPORT_SYMBOL(obd_stale_export_put);
1873 * Adjust the position of the export in the stale list,
1874 * i.e. move to the head of the list if is needed.
1876 void obd_stale_export_adjust(struct obd_export *exp)
1878 LASSERT(exp != NULL);
1879 spin_lock_bh(&exp->exp_bl_list_lock);
1880 spin_lock(&obd_stale_export_lock);
1882 if (!list_empty(&exp->exp_stale_list) &&
1883 !list_empty(&exp->exp_bl_list))
1884 list_move(&exp->exp_stale_list, &obd_stale_exports);
1886 spin_unlock(&obd_stale_export_lock);
1887 spin_unlock_bh(&exp->exp_bl_list_lock);
1889 EXPORT_SYMBOL(obd_stale_export_adjust);
1892 * start destroy zombie import/export thread
1894 int obd_zombie_impexp_init(void)
1896 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1904 * stop destroy zombie import/export thread
1906 void obd_zombie_impexp_stop(void)
1908 destroy_workqueue(zombie_wq);
1909 LASSERT(list_empty(&obd_stale_exports));
1912 /***** Kernel-userspace comm helpers *******/
1914 /* Get length of entire message, including header */
1915 int kuc_len(int payload_len)
1917 return sizeof(struct kuc_hdr) + payload_len;
1919 EXPORT_SYMBOL(kuc_len);
1921 /* Get a pointer to kuc header, given a ptr to the payload
1922 * @param p Pointer to payload area
1923 * @returns Pointer to kuc header
1925 struct kuc_hdr * kuc_ptr(void *p)
1927 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1928 LASSERT(lh->kuc_magic == KUC_MAGIC);
1931 EXPORT_SYMBOL(kuc_ptr);
1933 /* Alloc space for a message, and fill in header
1934 * @return Pointer to payload area
1936 void *kuc_alloc(int payload_len, int transport, int type)
1939 int len = kuc_len(payload_len);
1943 return ERR_PTR(-ENOMEM);
1945 lh->kuc_magic = KUC_MAGIC;
1946 lh->kuc_transport = transport;
1947 lh->kuc_msgtype = type;
1948 lh->kuc_msglen = len;
1950 return (void *)(lh + 1);
1952 EXPORT_SYMBOL(kuc_alloc);
1954 /* Takes pointer to payload area */
1955 void kuc_free(void *p, int payload_len)
1957 struct kuc_hdr *lh = kuc_ptr(p);
1958 OBD_FREE(lh, kuc_len(payload_len));
1960 EXPORT_SYMBOL(kuc_free);
1962 struct obd_request_slot_waiter {
1963 struct list_head orsw_entry;
1964 wait_queue_head_t orsw_waitq;
1968 static bool obd_request_slot_avail(struct client_obd *cli,
1969 struct obd_request_slot_waiter *orsw)
1973 spin_lock(&cli->cl_loi_list_lock);
1974 avail = !!list_empty(&orsw->orsw_entry);
1975 spin_unlock(&cli->cl_loi_list_lock);
1981 * For network flow control, the RPC sponsor needs to acquire a credit
1982 * before sending the RPC. The credits count for a connection is defined
1983 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1984 * the subsequent RPC sponsors need to wait until others released their
1985 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1987 int obd_get_request_slot(struct client_obd *cli)
1989 struct obd_request_slot_waiter orsw;
1992 spin_lock(&cli->cl_loi_list_lock);
1993 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1994 cli->cl_rpcs_in_flight++;
1995 spin_unlock(&cli->cl_loi_list_lock);
1999 init_waitqueue_head(&orsw.orsw_waitq);
2000 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2001 orsw.orsw_signaled = false;
2002 spin_unlock(&cli->cl_loi_list_lock);
2004 rc = l_wait_event_abortable(orsw.orsw_waitq,
2005 obd_request_slot_avail(cli, &orsw) ||
2006 orsw.orsw_signaled);
2008 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2009 * freed but other (such as obd_put_request_slot) is using it. */
2010 spin_lock(&cli->cl_loi_list_lock);
2012 if (!orsw.orsw_signaled) {
2013 if (list_empty(&orsw.orsw_entry))
2014 cli->cl_rpcs_in_flight--;
2016 list_del(&orsw.orsw_entry);
2021 if (orsw.orsw_signaled) {
2022 LASSERT(list_empty(&orsw.orsw_entry));
2026 spin_unlock(&cli->cl_loi_list_lock);
2030 EXPORT_SYMBOL(obd_get_request_slot);
2032 void obd_put_request_slot(struct client_obd *cli)
2034 struct obd_request_slot_waiter *orsw;
2036 spin_lock(&cli->cl_loi_list_lock);
2037 cli->cl_rpcs_in_flight--;
2039 /* If there is free slot, wakeup the first waiter. */
2040 if (!list_empty(&cli->cl_flight_waiters) &&
2041 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2042 orsw = list_first_entry(&cli->cl_flight_waiters,
2043 struct obd_request_slot_waiter,
2045 list_del_init(&orsw->orsw_entry);
2046 cli->cl_rpcs_in_flight++;
2047 wake_up(&orsw->orsw_waitq);
2049 spin_unlock(&cli->cl_loi_list_lock);
2051 EXPORT_SYMBOL(obd_put_request_slot);
2053 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2055 return cli->cl_max_rpcs_in_flight;
2057 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2059 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2061 struct obd_request_slot_waiter *orsw;
2065 const char *type_name;
2068 if (max > OBD_MAX_RIF_MAX || max < 1)
2071 type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2072 if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2073 /* adjust max_mod_rpcs_in_flight to ensure it is always
2074 * strictly lower that max_rpcs_in_flight */
2076 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2077 "because it must be higher than "
2078 "max_mod_rpcs_in_flight value",
2079 cli->cl_import->imp_obd->obd_name);
2082 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2083 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2089 spin_lock(&cli->cl_loi_list_lock);
2090 old = cli->cl_max_rpcs_in_flight;
2091 cli->cl_max_rpcs_in_flight = max;
2092 client_adjust_max_dirty(cli);
2096 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2097 for (i = 0; i < diff; i++) {
2098 if (list_empty(&cli->cl_flight_waiters))
2101 orsw = list_first_entry(&cli->cl_flight_waiters,
2102 struct obd_request_slot_waiter,
2104 list_del_init(&orsw->orsw_entry);
2105 cli->cl_rpcs_in_flight++;
2106 wake_up(&orsw->orsw_waitq);
2108 spin_unlock(&cli->cl_loi_list_lock);
2112 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2114 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2116 return cli->cl_max_mod_rpcs_in_flight;
2118 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2120 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2122 struct obd_connect_data *ocd;
2126 if (max > OBD_MAX_RIF_MAX || max < 1)
2129 /* cannot exceed or equal max_rpcs_in_flight */
2130 if (max >= cli->cl_max_rpcs_in_flight) {
2131 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2132 "higher or equal to max_rpcs_in_flight value (%u)\n",
2133 cli->cl_import->imp_obd->obd_name,
2134 max, cli->cl_max_rpcs_in_flight);
2138 /* cannot exceed max modify RPCs in flight supported by the server */
2139 ocd = &cli->cl_import->imp_connect_data;
2140 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2141 maxmodrpcs = ocd->ocd_maxmodrpcs;
2144 if (max > maxmodrpcs) {
2145 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2146 "higher than max_mod_rpcs_per_client value (%hu) "
2147 "returned by the server at connection\n",
2148 cli->cl_import->imp_obd->obd_name,
2153 spin_lock(&cli->cl_mod_rpcs_lock);
2155 prev = cli->cl_max_mod_rpcs_in_flight;
2156 cli->cl_max_mod_rpcs_in_flight = max;
2158 /* wakeup waiters if limit has been increased */
2159 if (cli->cl_max_mod_rpcs_in_flight > prev)
2160 wake_up(&cli->cl_mod_rpcs_waitq);
2162 spin_unlock(&cli->cl_mod_rpcs_lock);
2166 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2168 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2169 struct seq_file *seq)
2171 unsigned long mod_tot = 0, mod_cum;
2172 struct timespec64 now;
2175 ktime_get_real_ts64(&now);
2177 spin_lock(&cli->cl_mod_rpcs_lock);
2179 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2180 (s64)now.tv_sec, now.tv_nsec);
2181 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2182 cli->cl_mod_rpcs_in_flight);
2184 seq_printf(seq, "\n\t\t\tmodify\n");
2185 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2187 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2190 for (i = 0; i < OBD_HIST_MAX; i++) {
2191 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2193 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2194 i, mod, pct(mod, mod_tot),
2195 pct(mod_cum, mod_tot));
2196 if (mod_cum == mod_tot)
2200 spin_unlock(&cli->cl_mod_rpcs_lock);
2204 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2206 /* The number of modify RPCs sent in parallel is limited
2207 * because the server has a finite number of slots per client to
2208 * store request result and ensure reply reconstruction when needed.
2209 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2210 * that takes into account server limit and cl_max_rpcs_in_flight
2212 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2213 * one close request is allowed above the maximum.
2215 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2220 /* A slot is available if
2221 * - number of modify RPCs in flight is less than the max
2222 * - it's a close RPC and no other close request is in flight
2224 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2225 (close_req && cli->cl_close_rpcs_in_flight == 0);
2230 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2235 spin_lock(&cli->cl_mod_rpcs_lock);
2236 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2237 spin_unlock(&cli->cl_mod_rpcs_lock);
2242 /* Get a modify RPC slot from the obd client @cli according
2243 * to the kind of operation @opc that is going to be sent
2244 * and the intent @it of the operation if it applies.
2245 * If the maximum number of modify RPCs in flight is reached
2246 * the thread is put to sleep.
2247 * Returns the tag to be set in the request message. Tag 0
2248 * is reserved for non-modifying requests.
2250 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2252 bool close_req = false;
2255 if (opc == MDS_CLOSE)
2259 spin_lock(&cli->cl_mod_rpcs_lock);
2260 max = cli->cl_max_mod_rpcs_in_flight;
2261 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2262 /* there is a slot available */
2263 cli->cl_mod_rpcs_in_flight++;
2265 cli->cl_close_rpcs_in_flight++;
2266 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2267 cli->cl_mod_rpcs_in_flight);
2268 /* find a free tag */
2269 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2271 LASSERT(i < OBD_MAX_RIF_MAX);
2272 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2273 spin_unlock(&cli->cl_mod_rpcs_lock);
2274 /* tag 0 is reserved for non-modify RPCs */
2277 "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2278 cli->cl_import->imp_obd->obd_name,
2283 spin_unlock(&cli->cl_mod_rpcs_lock);
2285 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2286 "opc %u, max %hu\n",
2287 cli->cl_import->imp_obd->obd_name, opc, max);
2289 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2290 obd_mod_rpc_slot_avail(cli,
2294 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2296 /* Put a modify RPC slot from the obd client @cli according
2297 * to the kind of operation @opc that has been sent.
2299 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2301 bool close_req = false;
2306 if (opc == MDS_CLOSE)
2309 spin_lock(&cli->cl_mod_rpcs_lock);
2310 cli->cl_mod_rpcs_in_flight--;
2312 cli->cl_close_rpcs_in_flight--;
2313 /* release the tag in the bitmap */
2314 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2315 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2316 spin_unlock(&cli->cl_mod_rpcs_lock);
2317 wake_up(&cli->cl_mod_rpcs_waitq);
2319 EXPORT_SYMBOL(obd_put_mod_rpc_slot);