4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59 const char *status, int locks, int debug_level);
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90 obd, obd->obd_namespace, obd->obd_force);
93 lu_ref_fini(&obd->obd_reference);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 struct obd_type *class_search_type(const char *name)
99 struct kobject *kobj = kset_find_obj(lustre_kset, name);
101 if (kobj && kobj->ktype == &class_ktype)
102 return container_of(kobj, struct obd_type, typ_kobj);
107 EXPORT_SYMBOL(class_search_type);
109 struct obd_type *class_get_type(const char *name)
111 struct obd_type *type;
113 type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
116 const char *modname = name;
118 #ifdef HAVE_SERVER_SUPPORT
119 if (strcmp(modname, "obdfilter") == 0)
122 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123 modname = LUSTRE_OSP_NAME;
125 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126 modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 if (try_module_get(type->typ_dt_ops->o_owner)) {
140 atomic_inc(&type->typ_refcnt);
141 /* class_search_type() returned a counted reference,
142 * but we don't need that count any more as
143 * we have one through typ_refcnt.
145 kobject_put(&type->typ_kobj);
147 kobject_put(&type->typ_kobj);
154 void class_put_type(struct obd_type *type)
157 module_put(type->typ_dt_ops->o_owner);
158 atomic_dec(&type->typ_refcnt);
161 static void class_sysfs_release(struct kobject *kobj)
163 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
165 debugfs_remove_recursive(type->typ_debugfs_entry);
166 type->typ_debugfs_entry = NULL;
169 lu_device_type_fini(type->typ_lu);
171 #ifdef CONFIG_PROC_FS
172 if (type->typ_name && type->typ_procroot)
173 remove_proc_subtree(type->typ_name, proc_lustre_root);
175 OBD_FREE(type, sizeof(*type));
178 static struct kobj_type class_ktype = {
179 .sysfs_ops = &lustre_sysfs_ops,
180 .release = class_sysfs_release,
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
186 struct dentry *symlink;
187 struct obd_type *type;
190 type = class_search_type(name);
192 kobject_put(&type->typ_kobj);
193 return ERR_PTR(-EEXIST);
196 OBD_ALLOC(type, sizeof(*type));
198 return ERR_PTR(-ENOMEM);
200 type->typ_kobj.kset = lustre_kset;
201 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202 &lustre_kset->kobj, "%s", name);
206 symlink = debugfs_create_dir(name, debugfs_lustre_root);
207 if (IS_ERR_OR_NULL(symlink)) {
208 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
209 kobject_put(&type->typ_kobj);
212 type->typ_debugfs_entry = symlink;
213 type->typ_sym_filter = true;
216 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
218 if (IS_ERR(type->typ_procroot)) {
219 CERROR("%s: can't create compat proc entry: %d\n",
220 name, (int)PTR_ERR(type->typ_procroot));
221 type->typ_procroot = NULL;
227 EXPORT_SYMBOL(class_add_symlinks);
228 #endif /* HAVE_SERVER_SUPPORT */
230 #define CLASS_MAX_NAME 1024
232 int class_register_type(const struct obd_ops *dt_ops,
233 const struct md_ops *md_ops,
234 bool enable_proc, struct lprocfs_vars *vars,
235 const char *name, struct lu_device_type *ldt)
237 struct obd_type *type;
242 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
244 type = class_search_type(name);
246 #ifdef HAVE_SERVER_SUPPORT
247 if (type->typ_sym_filter)
249 #endif /* HAVE_SERVER_SUPPORT */
250 kobject_put(&type->typ_kobj);
251 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
255 OBD_ALLOC(type, sizeof(*type));
259 type->typ_kobj.kset = lustre_kset;
260 kobject_init(&type->typ_kobj, &class_ktype);
261 #ifdef HAVE_SERVER_SUPPORT
263 #endif /* HAVE_SERVER_SUPPORT */
265 type->typ_dt_ops = dt_ops;
266 type->typ_md_ops = md_ops;
268 #ifdef HAVE_SERVER_SUPPORT
269 if (type->typ_sym_filter) {
270 type->typ_sym_filter = false;
271 kobject_put(&type->typ_kobj);
275 #ifdef CONFIG_PROC_FS
276 if (enable_proc && !type->typ_procroot) {
277 type->typ_procroot = lprocfs_register(name,
280 if (IS_ERR(type->typ_procroot)) {
281 rc = PTR_ERR(type->typ_procroot);
282 type->typ_procroot = NULL;
287 type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
289 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
290 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
292 type->typ_debugfs_entry = NULL;
296 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
299 #ifdef HAVE_SERVER_SUPPORT
304 rc = lu_device_type_init(ldt);
312 kobject_put(&type->typ_kobj);
316 EXPORT_SYMBOL(class_register_type);
318 int class_unregister_type(const char *name)
320 struct obd_type *type = class_search_type(name);
325 CERROR("unknown obd type\n");
329 if (atomic_read(&type->typ_refcnt)) {
330 CERROR("type %s has refcount (%d)\n", name,
331 atomic_read(&type->typ_refcnt));
332 /* This is a bad situation, let's make the best of it */
333 /* Remove ops, but leave the name for debugging */
334 type->typ_dt_ops = NULL;
335 type->typ_md_ops = NULL;
336 GOTO(out_put, rc = -EBUSY);
339 /* Put the final ref */
340 kobject_put(&type->typ_kobj);
342 /* Put the ref returned by class_search_type() */
343 kobject_put(&type->typ_kobj);
346 } /* class_unregister_type */
347 EXPORT_SYMBOL(class_unregister_type);
350 * Create a new obd device.
352 * Allocate the new obd_device and initialize it.
354 * \param[in] type_name obd device type string.
355 * \param[in] name obd device name.
356 * \param[in] uuid obd device UUID
358 * \retval newdev pointer to created obd_device
359 * \retval ERR_PTR(errno) on error
361 struct obd_device *class_newdev(const char *type_name, const char *name,
364 struct obd_device *newdev;
365 struct obd_type *type = NULL;
368 if (strlen(name) >= MAX_OBD_NAME) {
369 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
370 RETURN(ERR_PTR(-EINVAL));
373 type = class_get_type(type_name);
375 CERROR("OBD: unknown type: %s\n", type_name);
376 RETURN(ERR_PTR(-ENODEV));
379 newdev = obd_device_alloc();
380 if (newdev == NULL) {
381 class_put_type(type);
382 RETURN(ERR_PTR(-ENOMEM));
384 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
385 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
386 newdev->obd_type = type;
387 newdev->obd_minor = -1;
389 rwlock_init(&newdev->obd_pool_lock);
390 newdev->obd_pool_limit = 0;
391 newdev->obd_pool_slv = 0;
393 INIT_LIST_HEAD(&newdev->obd_exports);
394 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
395 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
396 INIT_LIST_HEAD(&newdev->obd_exports_timed);
397 INIT_LIST_HEAD(&newdev->obd_nid_stats);
398 spin_lock_init(&newdev->obd_nid_lock);
399 spin_lock_init(&newdev->obd_dev_lock);
400 mutex_init(&newdev->obd_dev_mutex);
401 spin_lock_init(&newdev->obd_osfs_lock);
402 /* newdev->obd_osfs_age must be set to a value in the distant
403 * past to guarantee a fresh statfs is fetched on mount. */
404 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
406 /* XXX belongs in setup not attach */
407 init_rwsem(&newdev->obd_observer_link_sem);
409 spin_lock_init(&newdev->obd_recovery_task_lock);
410 init_waitqueue_head(&newdev->obd_next_transno_waitq);
411 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
412 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
413 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
414 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
415 INIT_LIST_HEAD(&newdev->obd_evict_list);
416 INIT_LIST_HEAD(&newdev->obd_lwp_list);
418 llog_group_init(&newdev->obd_olg);
419 /* Detach drops this */
420 atomic_set(&newdev->obd_refcount, 1);
421 lu_ref_init(&newdev->obd_reference);
422 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
424 newdev->obd_conn_inprogress = 0;
426 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
428 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
429 newdev->obd_name, newdev);
437 * \param[in] obd obd_device to be freed
441 void class_free_dev(struct obd_device *obd)
443 struct obd_type *obd_type = obd->obd_type;
445 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
446 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
447 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
448 "obd %p != obd_devs[%d] %p\n",
449 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
450 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
451 "obd_refcount should be 0, not %d\n",
452 atomic_read(&obd->obd_refcount));
453 LASSERT(obd_type != NULL);
455 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
456 obd->obd_name, obd->obd_type->typ_name);
458 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
459 obd->obd_name, obd->obd_uuid.uuid);
460 if (obd->obd_stopping) {
463 /* If we're not stopping, we were never set up */
464 err = obd_cleanup(obd);
466 CERROR("Cleanup %s returned %d\n",
470 obd_device_free(obd);
472 class_put_type(obd_type);
476 * Unregister obd device.
478 * Free slot in obd_dev[] used by \a obd.
480 * \param[in] new_obd obd_device to be unregistered
484 void class_unregister_device(struct obd_device *obd)
486 write_lock(&obd_dev_lock);
487 if (obd->obd_minor >= 0) {
488 LASSERT(obd_devs[obd->obd_minor] == obd);
489 obd_devs[obd->obd_minor] = NULL;
492 write_unlock(&obd_dev_lock);
496 * Register obd device.
498 * Find free slot in obd_devs[], fills it with \a new_obd.
500 * \param[in] new_obd obd_device to be registered
503 * \retval -EEXIST device with this name is registered
504 * \retval -EOVERFLOW obd_devs[] is full
506 int class_register_device(struct obd_device *new_obd)
510 int new_obd_minor = 0;
511 bool minor_assign = false;
512 bool retried = false;
515 write_lock(&obd_dev_lock);
516 for (i = 0; i < class_devno_max(); i++) {
517 struct obd_device *obd = class_num2obd(i);
520 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
523 write_unlock(&obd_dev_lock);
525 /* the obd_device could be waited to be
526 * destroyed by the "obd_zombie_impexp_thread".
528 obd_zombie_barrier();
533 CERROR("%s: already exists, won't add\n",
535 /* in case we found a free slot before duplicate */
536 minor_assign = false;
540 if (!minor_assign && obd == NULL) {
547 new_obd->obd_minor = new_obd_minor;
548 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
549 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
550 obd_devs[new_obd_minor] = new_obd;
554 CERROR("%s: all %u/%u devices used, increase "
555 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
556 i, class_devno_max(), ret);
559 write_unlock(&obd_dev_lock);
564 static int class_name2dev_nolock(const char *name)
571 for (i = 0; i < class_devno_max(); i++) {
572 struct obd_device *obd = class_num2obd(i);
574 if (obd && strcmp(name, obd->obd_name) == 0) {
575 /* Make sure we finished attaching before we give
576 out any references */
577 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
578 if (obd->obd_attached) {
588 int class_name2dev(const char *name)
595 read_lock(&obd_dev_lock);
596 i = class_name2dev_nolock(name);
597 read_unlock(&obd_dev_lock);
601 EXPORT_SYMBOL(class_name2dev);
603 struct obd_device *class_name2obd(const char *name)
605 int dev = class_name2dev(name);
607 if (dev < 0 || dev > class_devno_max())
609 return class_num2obd(dev);
611 EXPORT_SYMBOL(class_name2obd);
613 int class_uuid2dev_nolock(struct obd_uuid *uuid)
617 for (i = 0; i < class_devno_max(); i++) {
618 struct obd_device *obd = class_num2obd(i);
620 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
621 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
629 int class_uuid2dev(struct obd_uuid *uuid)
633 read_lock(&obd_dev_lock);
634 i = class_uuid2dev_nolock(uuid);
635 read_unlock(&obd_dev_lock);
639 EXPORT_SYMBOL(class_uuid2dev);
641 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
643 int dev = class_uuid2dev(uuid);
646 return class_num2obd(dev);
648 EXPORT_SYMBOL(class_uuid2obd);
651 * Get obd device from ::obd_devs[]
653 * \param num [in] array index
655 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
656 * otherwise return the obd device there.
658 struct obd_device *class_num2obd(int num)
660 struct obd_device *obd = NULL;
662 if (num < class_devno_max()) {
667 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
668 "%p obd_magic %08x != %08x\n",
669 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
670 LASSERTF(obd->obd_minor == num,
671 "%p obd_minor %0d != %0d\n",
672 obd, obd->obd_minor, num);
679 * Find obd in obd_dev[] by name or uuid.
681 * Increment obd's refcount if found.
683 * \param[in] str obd name or uuid
685 * \retval NULL if not found
686 * \retval target pointer to found obd_device
688 struct obd_device *class_dev_by_str(const char *str)
690 struct obd_device *target = NULL;
691 struct obd_uuid tgtuuid;
694 obd_str2uuid(&tgtuuid, str);
696 read_lock(&obd_dev_lock);
697 rc = class_uuid2dev_nolock(&tgtuuid);
699 rc = class_name2dev_nolock(str);
702 target = class_num2obd(rc);
705 class_incref(target, "find", current);
706 read_unlock(&obd_dev_lock);
710 EXPORT_SYMBOL(class_dev_by_str);
713 * Get obd devices count. Device in any
715 * \retval obd device count
717 int get_devices_count(void)
719 int index, max_index = class_devno_max(), dev_count = 0;
721 read_lock(&obd_dev_lock);
722 for (index = 0; index <= max_index; index++) {
723 struct obd_device *obd = class_num2obd(index);
727 read_unlock(&obd_dev_lock);
731 EXPORT_SYMBOL(get_devices_count);
733 void class_obd_list(void)
738 read_lock(&obd_dev_lock);
739 for (i = 0; i < class_devno_max(); i++) {
740 struct obd_device *obd = class_num2obd(i);
744 if (obd->obd_stopping)
746 else if (obd->obd_set_up)
748 else if (obd->obd_attached)
752 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
753 i, status, obd->obd_type->typ_name,
754 obd->obd_name, obd->obd_uuid.uuid,
755 atomic_read(&obd->obd_refcount));
757 read_unlock(&obd_dev_lock);
760 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
761 specified, then only the client with that uuid is returned,
762 otherwise any client connected to the tgt is returned. */
763 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
764 const char *type_name,
765 struct obd_uuid *grp_uuid)
769 read_lock(&obd_dev_lock);
770 for (i = 0; i < class_devno_max(); i++) {
771 struct obd_device *obd = class_num2obd(i);
775 if ((strncmp(obd->obd_type->typ_name, type_name,
776 strlen(type_name)) == 0)) {
777 if (obd_uuid_equals(tgt_uuid,
778 &obd->u.cli.cl_target_uuid) &&
779 ((grp_uuid)? obd_uuid_equals(grp_uuid,
780 &obd->obd_uuid) : 1)) {
781 read_unlock(&obd_dev_lock);
786 read_unlock(&obd_dev_lock);
790 EXPORT_SYMBOL(class_find_client_obd);
792 /* Iterate the obd_device list looking devices have grp_uuid. Start
793 searching at *next, and if a device is found, the next index to look
794 at is saved in *next. If next is NULL, then the first matching device
795 will always be returned. */
796 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
802 else if (*next >= 0 && *next < class_devno_max())
807 read_lock(&obd_dev_lock);
808 for (; i < class_devno_max(); i++) {
809 struct obd_device *obd = class_num2obd(i);
813 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
816 read_unlock(&obd_dev_lock);
820 read_unlock(&obd_dev_lock);
824 EXPORT_SYMBOL(class_devices_in_group);
827 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
828 * adjust sptlrpc settings accordingly.
830 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
832 struct obd_device *obd;
836 LASSERT(namelen > 0);
838 read_lock(&obd_dev_lock);
839 for (i = 0; i < class_devno_max(); i++) {
840 obd = class_num2obd(i);
842 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
845 /* only notify mdc, osc, osp, lwp, mdt, ost
846 * because only these have a -sptlrpc llog */
847 type = obd->obd_type->typ_name;
848 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
849 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
850 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
851 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
852 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
853 strcmp(type, LUSTRE_OST_NAME) != 0)
856 if (strncmp(obd->obd_name, fsname, namelen))
859 class_incref(obd, __FUNCTION__, obd);
860 read_unlock(&obd_dev_lock);
861 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
862 sizeof(KEY_SPTLRPC_CONF),
863 KEY_SPTLRPC_CONF, 0, NULL, NULL);
865 class_decref(obd, __FUNCTION__, obd);
866 read_lock(&obd_dev_lock);
868 read_unlock(&obd_dev_lock);
871 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
873 void obd_cleanup_caches(void)
876 if (obd_device_cachep) {
877 kmem_cache_destroy(obd_device_cachep);
878 obd_device_cachep = NULL;
884 int obd_init_caches(void)
889 LASSERT(obd_device_cachep == NULL);
890 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
891 sizeof(struct obd_device),
892 0, 0, 0, sizeof(struct obd_device), NULL);
893 if (!obd_device_cachep)
894 GOTO(out, rc = -ENOMEM);
898 obd_cleanup_caches();
902 static const char export_handle_owner[] = "export";
904 /* map connection to client */
905 struct obd_export *class_conn2export(struct lustre_handle *conn)
907 struct obd_export *export;
911 CDEBUG(D_CACHE, "looking for null handle\n");
915 if (conn->cookie == -1) { /* this means assign a new connection */
916 CDEBUG(D_CACHE, "want a new connection\n");
920 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
921 export = class_handle2object(conn->cookie, export_handle_owner);
924 EXPORT_SYMBOL(class_conn2export);
926 struct obd_device *class_exp2obd(struct obd_export *exp)
932 EXPORT_SYMBOL(class_exp2obd);
934 struct obd_import *class_exp2cliimp(struct obd_export *exp)
936 struct obd_device *obd = exp->exp_obd;
939 return obd->u.cli.cl_import;
941 EXPORT_SYMBOL(class_exp2cliimp);
943 /* Export management functions */
944 static void class_export_destroy(struct obd_export *exp)
946 struct obd_device *obd = exp->exp_obd;
949 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
950 LASSERT(obd != NULL);
952 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
953 exp->exp_client_uuid.uuid, obd->obd_name);
955 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
956 if (exp->exp_connection)
957 ptlrpc_put_connection_superhack(exp->exp_connection);
959 LASSERT(list_empty(&exp->exp_outstanding_replies));
960 LASSERT(list_empty(&exp->exp_uncommitted_replies));
961 LASSERT(list_empty(&exp->exp_req_replay_queue));
962 LASSERT(list_empty(&exp->exp_hp_rpcs));
963 obd_destroy_export(exp);
964 /* self export doesn't hold a reference to an obd, although it
965 * exists until freeing of the obd */
966 if (exp != obd->obd_self_export)
967 class_decref(obd, "export", exp);
969 OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
970 kfree_rcu(exp, exp_handle.h_rcu);
974 struct obd_export *class_export_get(struct obd_export *exp)
976 refcount_inc(&exp->exp_handle.h_ref);
977 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
978 refcount_read(&exp->exp_handle.h_ref));
981 EXPORT_SYMBOL(class_export_get);
983 void class_export_put(struct obd_export *exp)
985 LASSERT(exp != NULL);
986 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
987 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
988 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
989 refcount_read(&exp->exp_handle.h_ref) - 1);
991 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
992 struct obd_device *obd = exp->exp_obd;
994 CDEBUG(D_IOCTL, "final put %p/%s\n",
995 exp, exp->exp_client_uuid.uuid);
997 /* release nid stat refererence */
998 lprocfs_exp_cleanup(exp);
1000 if (exp == obd->obd_self_export) {
1001 /* self export should be destroyed without
1002 * zombie thread as it doesn't hold a
1003 * reference to obd and doesn't hold any
1005 class_export_destroy(exp);
1006 /* self export is destroyed, no class
1007 * references exist and it is safe to free
1009 class_free_dev(obd);
1011 LASSERT(!list_empty(&exp->exp_obd_chain));
1012 obd_zombie_export_add(exp);
1017 EXPORT_SYMBOL(class_export_put);
1019 static void obd_zombie_exp_cull(struct work_struct *ws)
1021 struct obd_export *export;
1023 export = container_of(ws, struct obd_export, exp_zombie_work);
1024 class_export_destroy(export);
1027 /* Creates a new export, adds it to the hash table, and returns a
1028 * pointer to it. The refcount is 2: one for the hash reference, and
1029 * one for the pointer returned by this function. */
1030 struct obd_export *__class_new_export(struct obd_device *obd,
1031 struct obd_uuid *cluuid, bool is_self)
1033 struct obd_export *export;
1037 OBD_ALLOC_PTR(export);
1039 return ERR_PTR(-ENOMEM);
1041 export->exp_conn_cnt = 0;
1042 export->exp_lock_hash = NULL;
1043 export->exp_flock_hash = NULL;
1044 /* 2 = class_handle_hash + last */
1045 refcount_set(&export->exp_handle.h_ref, 2);
1046 atomic_set(&export->exp_rpc_count, 0);
1047 atomic_set(&export->exp_cb_count, 0);
1048 atomic_set(&export->exp_locks_count, 0);
1049 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1050 INIT_LIST_HEAD(&export->exp_locks_list);
1051 spin_lock_init(&export->exp_locks_list_guard);
1053 atomic_set(&export->exp_replay_count, 0);
1054 export->exp_obd = obd;
1055 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1056 spin_lock_init(&export->exp_uncommitted_replies_lock);
1057 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1058 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1059 INIT_HLIST_NODE(&export->exp_handle.h_link);
1060 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1061 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1062 class_handle_hash(&export->exp_handle, export_handle_owner);
1063 export->exp_last_request_time = ktime_get_real_seconds();
1064 spin_lock_init(&export->exp_lock);
1065 spin_lock_init(&export->exp_rpc_lock);
1066 INIT_HLIST_NODE(&export->exp_nid_hash);
1067 INIT_HLIST_NODE(&export->exp_gen_hash);
1068 spin_lock_init(&export->exp_bl_list_lock);
1069 INIT_LIST_HEAD(&export->exp_bl_list);
1070 INIT_LIST_HEAD(&export->exp_stale_list);
1071 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1073 export->exp_sp_peer = LUSTRE_SP_ANY;
1074 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1075 export->exp_client_uuid = *cluuid;
1076 obd_init_export(export);
1078 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1080 spin_lock(&obd->obd_dev_lock);
1081 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1082 /* shouldn't happen, but might race */
1083 if (obd->obd_stopping)
1084 GOTO(exit_unlock, rc = -ENODEV);
1086 rc = obd_uuid_add(obd, export);
1088 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1089 obd->obd_name, cluuid->uuid, rc);
1090 GOTO(exit_unlock, rc = -EALREADY);
1095 class_incref(obd, "export", export);
1096 list_add_tail(&export->exp_obd_chain_timed,
1097 &obd->obd_exports_timed);
1098 list_add(&export->exp_obd_chain, &obd->obd_exports);
1099 obd->obd_num_exports++;
1101 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1102 INIT_LIST_HEAD(&export->exp_obd_chain);
1104 spin_unlock(&obd->obd_dev_lock);
1108 spin_unlock(&obd->obd_dev_lock);
1109 class_handle_unhash(&export->exp_handle);
1110 obd_destroy_export(export);
1111 OBD_FREE_PTR(export);
1115 struct obd_export *class_new_export(struct obd_device *obd,
1116 struct obd_uuid *uuid)
1118 return __class_new_export(obd, uuid, false);
1120 EXPORT_SYMBOL(class_new_export);
1122 struct obd_export *class_new_export_self(struct obd_device *obd,
1123 struct obd_uuid *uuid)
1125 return __class_new_export(obd, uuid, true);
1128 void class_unlink_export(struct obd_export *exp)
1130 class_handle_unhash(&exp->exp_handle);
1132 if (exp->exp_obd->obd_self_export == exp) {
1133 class_export_put(exp);
1137 spin_lock(&exp->exp_obd->obd_dev_lock);
1138 /* delete an uuid-export hashitem from hashtables */
1139 if (exp != exp->exp_obd->obd_self_export)
1140 obd_uuid_del(exp->exp_obd, exp);
1142 #ifdef HAVE_SERVER_SUPPORT
1143 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1144 struct tg_export_data *ted = &exp->exp_target_data;
1145 struct cfs_hash *hash;
1147 /* Because obd_gen_hash will not be released until
1148 * class_cleanup(), so hash should never be NULL here */
1149 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1150 LASSERT(hash != NULL);
1151 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1152 &exp->exp_gen_hash);
1153 cfs_hash_putref(hash);
1155 #endif /* HAVE_SERVER_SUPPORT */
1157 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1158 list_del_init(&exp->exp_obd_chain_timed);
1159 exp->exp_obd->obd_num_exports--;
1160 spin_unlock(&exp->exp_obd->obd_dev_lock);
1161 atomic_inc(&obd_stale_export_num);
1163 /* A reference is kept by obd_stale_exports list */
1164 obd_stale_export_put(exp);
1166 EXPORT_SYMBOL(class_unlink_export);
1168 /* Import management functions */
1169 static void obd_zombie_import_free(struct obd_import *imp)
1173 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1174 imp->imp_obd->obd_name);
1176 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1178 ptlrpc_put_connection_superhack(imp->imp_connection);
1180 while (!list_empty(&imp->imp_conn_list)) {
1181 struct obd_import_conn *imp_conn;
1183 imp_conn = list_entry(imp->imp_conn_list.next,
1184 struct obd_import_conn, oic_item);
1185 list_del_init(&imp_conn->oic_item);
1186 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1187 OBD_FREE(imp_conn, sizeof(*imp_conn));
1190 LASSERT(imp->imp_sec == NULL);
1191 class_decref(imp->imp_obd, "import", imp);
1196 struct obd_import *class_import_get(struct obd_import *import)
1198 atomic_inc(&import->imp_refcount);
1199 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1200 atomic_read(&import->imp_refcount),
1201 import->imp_obd->obd_name);
1204 EXPORT_SYMBOL(class_import_get);
1206 void class_import_put(struct obd_import *imp)
1210 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1212 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1213 atomic_read(&imp->imp_refcount) - 1,
1214 imp->imp_obd->obd_name);
1216 if (atomic_dec_and_test(&imp->imp_refcount)) {
1217 CDEBUG(D_INFO, "final put import %p\n", imp);
1218 obd_zombie_import_add(imp);
1223 EXPORT_SYMBOL(class_import_put);
1225 static void init_imp_at(struct imp_at *at) {
1227 at_init(&at->iat_net_latency, 0, 0);
1228 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1229 /* max service estimates are tracked on the server side, so
1230 don't use the AT history here, just use the last reported
1231 val. (But keep hist for proc histogram, worst_ever) */
1232 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1237 static void obd_zombie_imp_cull(struct work_struct *ws)
1239 struct obd_import *import;
1241 import = container_of(ws, struct obd_import, imp_zombie_work);
1242 obd_zombie_import_free(import);
1245 struct obd_import *class_new_import(struct obd_device *obd)
1247 struct obd_import *imp;
1248 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1250 OBD_ALLOC(imp, sizeof(*imp));
1254 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1255 INIT_LIST_HEAD(&imp->imp_replay_list);
1256 INIT_LIST_HEAD(&imp->imp_sending_list);
1257 INIT_LIST_HEAD(&imp->imp_delayed_list);
1258 INIT_LIST_HEAD(&imp->imp_committed_list);
1259 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1260 imp->imp_known_replied_xid = 0;
1261 imp->imp_replay_cursor = &imp->imp_committed_list;
1262 spin_lock_init(&imp->imp_lock);
1263 imp->imp_last_success_conn = 0;
1264 imp->imp_state = LUSTRE_IMP_NEW;
1265 imp->imp_obd = class_incref(obd, "import", imp);
1266 rwlock_init(&imp->imp_sec_lock);
1267 init_waitqueue_head(&imp->imp_recovery_waitq);
1268 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1270 if (curr_pid_ns && curr_pid_ns->child_reaper)
1271 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1273 imp->imp_sec_refpid = 1;
1275 atomic_set(&imp->imp_refcount, 2);
1276 atomic_set(&imp->imp_unregistering, 0);
1277 atomic_set(&imp->imp_inflight, 0);
1278 atomic_set(&imp->imp_replay_inflight, 0);
1279 atomic_set(&imp->imp_inval_count, 0);
1280 INIT_LIST_HEAD(&imp->imp_conn_list);
1281 init_imp_at(&imp->imp_at);
1283 /* the default magic is V2, will be used in connect RPC, and
1284 * then adjusted according to the flags in request/reply. */
1285 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1289 EXPORT_SYMBOL(class_new_import);
1291 void class_destroy_import(struct obd_import *import)
1293 LASSERT(import != NULL);
1294 LASSERT(import != LP_POISON);
1296 spin_lock(&import->imp_lock);
1297 import->imp_generation++;
1298 spin_unlock(&import->imp_lock);
1299 class_import_put(import);
1301 EXPORT_SYMBOL(class_destroy_import);
1303 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1305 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1307 spin_lock(&exp->exp_locks_list_guard);
1309 LASSERT(lock->l_exp_refs_nr >= 0);
1311 if (lock->l_exp_refs_target != NULL &&
1312 lock->l_exp_refs_target != exp) {
1313 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1314 exp, lock, lock->l_exp_refs_target);
1316 if ((lock->l_exp_refs_nr ++) == 0) {
1317 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1318 lock->l_exp_refs_target = exp;
1320 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1321 lock, exp, lock->l_exp_refs_nr);
1322 spin_unlock(&exp->exp_locks_list_guard);
1324 EXPORT_SYMBOL(__class_export_add_lock_ref);
1326 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1328 spin_lock(&exp->exp_locks_list_guard);
1329 LASSERT(lock->l_exp_refs_nr > 0);
1330 if (lock->l_exp_refs_target != exp) {
1331 LCONSOLE_WARN("lock %p, "
1332 "mismatching export pointers: %p, %p\n",
1333 lock, lock->l_exp_refs_target, exp);
1335 if (-- lock->l_exp_refs_nr == 0) {
1336 list_del_init(&lock->l_exp_refs_link);
1337 lock->l_exp_refs_target = NULL;
1339 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1340 lock, exp, lock->l_exp_refs_nr);
1341 spin_unlock(&exp->exp_locks_list_guard);
1343 EXPORT_SYMBOL(__class_export_del_lock_ref);
1346 /* A connection defines an export context in which preallocation can
1347 be managed. This releases the export pointer reference, and returns
1348 the export handle, so the export refcount is 1 when this function
1350 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1351 struct obd_uuid *cluuid)
1353 struct obd_export *export;
1354 LASSERT(conn != NULL);
1355 LASSERT(obd != NULL);
1356 LASSERT(cluuid != NULL);
1359 export = class_new_export(obd, cluuid);
1361 RETURN(PTR_ERR(export));
1363 conn->cookie = export->exp_handle.h_cookie;
1364 class_export_put(export);
1366 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1367 cluuid->uuid, conn->cookie);
1370 EXPORT_SYMBOL(class_connect);
1372 /* if export is involved in recovery then clean up related things */
1373 static void class_export_recovery_cleanup(struct obd_export *exp)
1375 struct obd_device *obd = exp->exp_obd;
1377 spin_lock(&obd->obd_recovery_task_lock);
1378 if (obd->obd_recovering) {
1379 if (exp->exp_in_recovery) {
1380 spin_lock(&exp->exp_lock);
1381 exp->exp_in_recovery = 0;
1382 spin_unlock(&exp->exp_lock);
1383 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1384 atomic_dec(&obd->obd_connected_clients);
1387 /* if called during recovery then should update
1388 * obd_stale_clients counter,
1389 * lightweight exports are not counted */
1390 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1391 exp->exp_obd->obd_stale_clients++;
1393 spin_unlock(&obd->obd_recovery_task_lock);
1395 spin_lock(&exp->exp_lock);
1396 /** Cleanup req replay fields */
1397 if (exp->exp_req_replay_needed) {
1398 exp->exp_req_replay_needed = 0;
1400 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1401 atomic_dec(&obd->obd_req_replay_clients);
1404 /** Cleanup lock replay data */
1405 if (exp->exp_lock_replay_needed) {
1406 exp->exp_lock_replay_needed = 0;
1408 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1409 atomic_dec(&obd->obd_lock_replay_clients);
1411 spin_unlock(&exp->exp_lock);
1414 /* This function removes 1-3 references from the export:
1415 * 1 - for export pointer passed
1416 * and if disconnect really need
1417 * 2 - removing from hash
1418 * 3 - in client_unlink_export
1419 * The export pointer passed to this function can destroyed */
1420 int class_disconnect(struct obd_export *export)
1422 int already_disconnected;
1425 if (export == NULL) {
1426 CWARN("attempting to free NULL export %p\n", export);
1430 spin_lock(&export->exp_lock);
1431 already_disconnected = export->exp_disconnected;
1432 export->exp_disconnected = 1;
1433 /* We hold references of export for uuid hash
1434 * and nid_hash and export link at least. So
1435 * it is safe to call cfs_hash_del in there. */
1436 if (!hlist_unhashed(&export->exp_nid_hash))
1437 cfs_hash_del(export->exp_obd->obd_nid_hash,
1438 &export->exp_connection->c_peer.nid,
1439 &export->exp_nid_hash);
1440 spin_unlock(&export->exp_lock);
1442 /* class_cleanup(), abort_recovery(), and class_fail_export()
1443 * all end up in here, and if any of them race we shouldn't
1444 * call extra class_export_puts(). */
1445 if (already_disconnected) {
1446 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1447 GOTO(no_disconn, already_disconnected);
1450 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1451 export->exp_handle.h_cookie);
1453 class_export_recovery_cleanup(export);
1454 class_unlink_export(export);
1456 class_export_put(export);
1459 EXPORT_SYMBOL(class_disconnect);
1461 /* Return non-zero for a fully connected export */
1462 int class_connected_export(struct obd_export *exp)
1467 spin_lock(&exp->exp_lock);
1468 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1469 spin_unlock(&exp->exp_lock);
1473 EXPORT_SYMBOL(class_connected_export);
1475 static void class_disconnect_export_list(struct list_head *list,
1476 enum obd_option flags)
1479 struct obd_export *exp;
1482 /* It's possible that an export may disconnect itself, but
1483 * nothing else will be added to this list. */
1484 while (!list_empty(list)) {
1485 exp = list_entry(list->next, struct obd_export,
1487 /* need for safe call CDEBUG after obd_disconnect */
1488 class_export_get(exp);
1490 spin_lock(&exp->exp_lock);
1491 exp->exp_flags = flags;
1492 spin_unlock(&exp->exp_lock);
1494 if (obd_uuid_equals(&exp->exp_client_uuid,
1495 &exp->exp_obd->obd_uuid)) {
1497 "exp %p export uuid == obd uuid, don't discon\n",
1499 /* Need to delete this now so we don't end up pointing
1500 * to work_list later when this export is cleaned up. */
1501 list_del_init(&exp->exp_obd_chain);
1502 class_export_put(exp);
1506 class_export_get(exp);
1507 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1508 "last request at %lld\n",
1509 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1510 exp, exp->exp_last_request_time);
1511 /* release one export reference anyway */
1512 rc = obd_disconnect(exp);
1514 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1515 obd_export_nid2str(exp), exp, rc);
1516 class_export_put(exp);
1521 void class_disconnect_exports(struct obd_device *obd)
1523 struct list_head work_list;
1526 /* Move all of the exports from obd_exports to a work list, en masse. */
1527 INIT_LIST_HEAD(&work_list);
1528 spin_lock(&obd->obd_dev_lock);
1529 list_splice_init(&obd->obd_exports, &work_list);
1530 list_splice_init(&obd->obd_delayed_exports, &work_list);
1531 spin_unlock(&obd->obd_dev_lock);
1533 if (!list_empty(&work_list)) {
1534 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1535 "disconnecting them\n", obd->obd_minor, obd);
1536 class_disconnect_export_list(&work_list,
1537 exp_flags_from_obd(obd));
1539 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1540 obd->obd_minor, obd);
1543 EXPORT_SYMBOL(class_disconnect_exports);
1545 /* Remove exports that have not completed recovery.
1547 void class_disconnect_stale_exports(struct obd_device *obd,
1548 int (*test_export)(struct obd_export *))
1550 struct list_head work_list;
1551 struct obd_export *exp, *n;
1555 INIT_LIST_HEAD(&work_list);
1556 spin_lock(&obd->obd_dev_lock);
1557 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1559 /* don't count self-export as client */
1560 if (obd_uuid_equals(&exp->exp_client_uuid,
1561 &exp->exp_obd->obd_uuid))
1564 /* don't evict clients which have no slot in last_rcvd
1565 * (e.g. lightweight connection) */
1566 if (exp->exp_target_data.ted_lr_idx == -1)
1569 spin_lock(&exp->exp_lock);
1570 if (exp->exp_failed || test_export(exp)) {
1571 spin_unlock(&exp->exp_lock);
1574 exp->exp_failed = 1;
1575 spin_unlock(&exp->exp_lock);
1577 list_move(&exp->exp_obd_chain, &work_list);
1579 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1580 obd->obd_name, exp->exp_client_uuid.uuid,
1581 obd_export_nid2str(exp));
1582 print_export_data(exp, "EVICTING", 0, D_HA);
1584 spin_unlock(&obd->obd_dev_lock);
1587 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1588 obd->obd_name, evicted);
1590 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1591 OBD_OPT_ABORT_RECOV);
1594 EXPORT_SYMBOL(class_disconnect_stale_exports);
1596 void class_fail_export(struct obd_export *exp)
1598 int rc, already_failed;
1600 spin_lock(&exp->exp_lock);
1601 already_failed = exp->exp_failed;
1602 exp->exp_failed = 1;
1603 spin_unlock(&exp->exp_lock);
1605 if (already_failed) {
1606 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1607 exp, exp->exp_client_uuid.uuid);
1611 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1612 exp, exp->exp_client_uuid.uuid);
1614 if (obd_dump_on_timeout)
1615 libcfs_debug_dumplog();
1617 /* need for safe call CDEBUG after obd_disconnect */
1618 class_export_get(exp);
1620 /* Most callers into obd_disconnect are removing their own reference
1621 * (request, for example) in addition to the one from the hash table.
1622 * We don't have such a reference here, so make one. */
1623 class_export_get(exp);
1624 rc = obd_disconnect(exp);
1626 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1628 CDEBUG(D_HA, "disconnected export %p/%s\n",
1629 exp, exp->exp_client_uuid.uuid);
1630 class_export_put(exp);
1632 EXPORT_SYMBOL(class_fail_export);
1634 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1636 struct cfs_hash *nid_hash;
1637 struct obd_export *doomed_exp = NULL;
1638 int exports_evicted = 0;
1640 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1642 spin_lock(&obd->obd_dev_lock);
1643 /* umount has run already, so evict thread should leave
1644 * its task to umount thread now */
1645 if (obd->obd_stopping) {
1646 spin_unlock(&obd->obd_dev_lock);
1647 return exports_evicted;
1649 nid_hash = obd->obd_nid_hash;
1650 cfs_hash_getref(nid_hash);
1651 spin_unlock(&obd->obd_dev_lock);
1654 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1655 if (doomed_exp == NULL)
1658 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1659 "nid %s found, wanted nid %s, requested nid %s\n",
1660 obd_export_nid2str(doomed_exp),
1661 libcfs_nid2str(nid_key), nid);
1662 LASSERTF(doomed_exp != obd->obd_self_export,
1663 "self-export is hashed by NID?\n");
1665 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1666 "request\n", obd->obd_name,
1667 obd_uuid2str(&doomed_exp->exp_client_uuid),
1668 obd_export_nid2str(doomed_exp));
1669 class_fail_export(doomed_exp);
1670 class_export_put(doomed_exp);
1673 cfs_hash_putref(nid_hash);
1675 if (!exports_evicted)
1676 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1677 obd->obd_name, nid);
1678 return exports_evicted;
1680 EXPORT_SYMBOL(obd_export_evict_by_nid);
1682 #ifdef HAVE_SERVER_SUPPORT
1683 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1685 struct obd_export *doomed_exp = NULL;
1686 struct obd_uuid doomed_uuid;
1687 int exports_evicted = 0;
1689 spin_lock(&obd->obd_dev_lock);
1690 if (obd->obd_stopping) {
1691 spin_unlock(&obd->obd_dev_lock);
1692 return exports_evicted;
1694 spin_unlock(&obd->obd_dev_lock);
1696 obd_str2uuid(&doomed_uuid, uuid);
1697 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1698 CERROR("%s: can't evict myself\n", obd->obd_name);
1699 return exports_evicted;
1702 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1703 if (doomed_exp == NULL) {
1704 CERROR("%s: can't disconnect %s: no exports found\n",
1705 obd->obd_name, uuid);
1707 CWARN("%s: evicting %s at adminstrative request\n",
1708 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1709 class_fail_export(doomed_exp);
1710 class_export_put(doomed_exp);
1711 obd_uuid_del(obd, doomed_exp);
1715 return exports_evicted;
1717 #endif /* HAVE_SERVER_SUPPORT */
1719 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1720 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1721 EXPORT_SYMBOL(class_export_dump_hook);
1724 static void print_export_data(struct obd_export *exp, const char *status,
1725 int locks, int debug_level)
1727 struct ptlrpc_reply_state *rs;
1728 struct ptlrpc_reply_state *first_reply = NULL;
1731 spin_lock(&exp->exp_lock);
1732 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1738 spin_unlock(&exp->exp_lock);
1740 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1741 "%p %s %llu stale:%d\n",
1742 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1743 obd_export_nid2str(exp),
1744 refcount_read(&exp->exp_handle.h_ref),
1745 atomic_read(&exp->exp_rpc_count),
1746 atomic_read(&exp->exp_cb_count),
1747 atomic_read(&exp->exp_locks_count),
1748 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1749 nreplies, first_reply, nreplies > 3 ? "..." : "",
1750 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1751 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1752 if (locks && class_export_dump_hook != NULL)
1753 class_export_dump_hook(exp);
1757 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1759 struct obd_export *exp;
1761 spin_lock(&obd->obd_dev_lock);
1762 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1763 print_export_data(exp, "ACTIVE", locks, debug_level);
1764 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1765 print_export_data(exp, "UNLINKED", locks, debug_level);
1766 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1767 print_export_data(exp, "DELAYED", locks, debug_level);
1768 spin_unlock(&obd->obd_dev_lock);
1771 void obd_exports_barrier(struct obd_device *obd)
1774 LASSERT(list_empty(&obd->obd_exports));
1775 spin_lock(&obd->obd_dev_lock);
1776 while (!list_empty(&obd->obd_unlinked_exports)) {
1777 spin_unlock(&obd->obd_dev_lock);
1778 set_current_state(TASK_UNINTERRUPTIBLE);
1779 schedule_timeout(cfs_time_seconds(waited));
1780 if (waited > 5 && is_power_of_2(waited)) {
1781 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1782 "more than %d seconds. "
1783 "The obd refcount = %d. Is it stuck?\n",
1784 obd->obd_name, waited,
1785 atomic_read(&obd->obd_refcount));
1786 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1789 spin_lock(&obd->obd_dev_lock);
1791 spin_unlock(&obd->obd_dev_lock);
1793 EXPORT_SYMBOL(obd_exports_barrier);
1796 * Add export to the obd_zombe thread and notify it.
1798 static void obd_zombie_export_add(struct obd_export *exp) {
1799 atomic_dec(&obd_stale_export_num);
1800 spin_lock(&exp->exp_obd->obd_dev_lock);
1801 LASSERT(!list_empty(&exp->exp_obd_chain));
1802 list_del_init(&exp->exp_obd_chain);
1803 spin_unlock(&exp->exp_obd->obd_dev_lock);
1805 queue_work(zombie_wq, &exp->exp_zombie_work);
1809 * Add import to the obd_zombe thread and notify it.
1811 static void obd_zombie_import_add(struct obd_import *imp) {
1812 LASSERT(imp->imp_sec == NULL);
1814 queue_work(zombie_wq, &imp->imp_zombie_work);
1818 * wait when obd_zombie import/export queues become empty
1820 void obd_zombie_barrier(void)
1822 flush_workqueue(zombie_wq);
1824 EXPORT_SYMBOL(obd_zombie_barrier);
1827 struct obd_export *obd_stale_export_get(void)
1829 struct obd_export *exp = NULL;
1832 spin_lock(&obd_stale_export_lock);
1833 if (!list_empty(&obd_stale_exports)) {
1834 exp = list_entry(obd_stale_exports.next,
1835 struct obd_export, exp_stale_list);
1836 list_del_init(&exp->exp_stale_list);
1838 spin_unlock(&obd_stale_export_lock);
1841 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1842 atomic_read(&obd_stale_export_num));
1846 EXPORT_SYMBOL(obd_stale_export_get);
1848 void obd_stale_export_put(struct obd_export *exp)
1852 LASSERT(list_empty(&exp->exp_stale_list));
1853 if (exp->exp_lock_hash &&
1854 atomic_read(&exp->exp_lock_hash->hs_count)) {
1855 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1856 atomic_read(&obd_stale_export_num));
1858 spin_lock_bh(&exp->exp_bl_list_lock);
1859 spin_lock(&obd_stale_export_lock);
1860 /* Add to the tail if there is no blocked locks,
1861 * to the head otherwise. */
1862 if (list_empty(&exp->exp_bl_list))
1863 list_add_tail(&exp->exp_stale_list,
1864 &obd_stale_exports);
1866 list_add(&exp->exp_stale_list,
1867 &obd_stale_exports);
1869 spin_unlock(&obd_stale_export_lock);
1870 spin_unlock_bh(&exp->exp_bl_list_lock);
1872 class_export_put(exp);
1876 EXPORT_SYMBOL(obd_stale_export_put);
1879 * Adjust the position of the export in the stale list,
1880 * i.e. move to the head of the list if is needed.
1882 void obd_stale_export_adjust(struct obd_export *exp)
1884 LASSERT(exp != NULL);
1885 spin_lock_bh(&exp->exp_bl_list_lock);
1886 spin_lock(&obd_stale_export_lock);
1888 if (!list_empty(&exp->exp_stale_list) &&
1889 !list_empty(&exp->exp_bl_list))
1890 list_move(&exp->exp_stale_list, &obd_stale_exports);
1892 spin_unlock(&obd_stale_export_lock);
1893 spin_unlock_bh(&exp->exp_bl_list_lock);
1895 EXPORT_SYMBOL(obd_stale_export_adjust);
1898 * start destroy zombie import/export thread
1900 int obd_zombie_impexp_init(void)
1902 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1910 * stop destroy zombie import/export thread
1912 void obd_zombie_impexp_stop(void)
1914 destroy_workqueue(zombie_wq);
1915 LASSERT(list_empty(&obd_stale_exports));
1918 /***** Kernel-userspace comm helpers *******/
1920 /* Get length of entire message, including header */
1921 int kuc_len(int payload_len)
1923 return sizeof(struct kuc_hdr) + payload_len;
1925 EXPORT_SYMBOL(kuc_len);
1927 /* Get a pointer to kuc header, given a ptr to the payload
1928 * @param p Pointer to payload area
1929 * @returns Pointer to kuc header
1931 struct kuc_hdr * kuc_ptr(void *p)
1933 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1934 LASSERT(lh->kuc_magic == KUC_MAGIC);
1937 EXPORT_SYMBOL(kuc_ptr);
1939 /* Alloc space for a message, and fill in header
1940 * @return Pointer to payload area
1942 void *kuc_alloc(int payload_len, int transport, int type)
1945 int len = kuc_len(payload_len);
1949 return ERR_PTR(-ENOMEM);
1951 lh->kuc_magic = KUC_MAGIC;
1952 lh->kuc_transport = transport;
1953 lh->kuc_msgtype = type;
1954 lh->kuc_msglen = len;
1956 return (void *)(lh + 1);
1958 EXPORT_SYMBOL(kuc_alloc);
1960 /* Takes pointer to payload area */
1961 void kuc_free(void *p, int payload_len)
1963 struct kuc_hdr *lh = kuc_ptr(p);
1964 OBD_FREE(lh, kuc_len(payload_len));
1966 EXPORT_SYMBOL(kuc_free);
1968 struct obd_request_slot_waiter {
1969 struct list_head orsw_entry;
1970 wait_queue_head_t orsw_waitq;
1974 static bool obd_request_slot_avail(struct client_obd *cli,
1975 struct obd_request_slot_waiter *orsw)
1979 spin_lock(&cli->cl_loi_list_lock);
1980 avail = !!list_empty(&orsw->orsw_entry);
1981 spin_unlock(&cli->cl_loi_list_lock);
1987 * For network flow control, the RPC sponsor needs to acquire a credit
1988 * before sending the RPC. The credits count for a connection is defined
1989 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1990 * the subsequent RPC sponsors need to wait until others released their
1991 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1993 int obd_get_request_slot(struct client_obd *cli)
1995 struct obd_request_slot_waiter orsw;
1996 struct l_wait_info lwi;
1999 spin_lock(&cli->cl_loi_list_lock);
2000 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2001 cli->cl_rpcs_in_flight++;
2002 spin_unlock(&cli->cl_loi_list_lock);
2006 init_waitqueue_head(&orsw.orsw_waitq);
2007 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2008 orsw.orsw_signaled = false;
2009 spin_unlock(&cli->cl_loi_list_lock);
2011 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2012 rc = l_wait_event(orsw.orsw_waitq,
2013 obd_request_slot_avail(cli, &orsw) ||
2017 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2018 * freed but other (such as obd_put_request_slot) is using it. */
2019 spin_lock(&cli->cl_loi_list_lock);
2021 if (!orsw.orsw_signaled) {
2022 if (list_empty(&orsw.orsw_entry))
2023 cli->cl_rpcs_in_flight--;
2025 list_del(&orsw.orsw_entry);
2029 if (orsw.orsw_signaled) {
2030 LASSERT(list_empty(&orsw.orsw_entry));
2034 spin_unlock(&cli->cl_loi_list_lock);
2038 EXPORT_SYMBOL(obd_get_request_slot);
2040 void obd_put_request_slot(struct client_obd *cli)
2042 struct obd_request_slot_waiter *orsw;
2044 spin_lock(&cli->cl_loi_list_lock);
2045 cli->cl_rpcs_in_flight--;
2047 /* If there is free slot, wakeup the first waiter. */
2048 if (!list_empty(&cli->cl_flight_waiters) &&
2049 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2050 orsw = list_entry(cli->cl_flight_waiters.next,
2051 struct obd_request_slot_waiter, orsw_entry);
2052 list_del_init(&orsw->orsw_entry);
2053 cli->cl_rpcs_in_flight++;
2054 wake_up(&orsw->orsw_waitq);
2056 spin_unlock(&cli->cl_loi_list_lock);
2058 EXPORT_SYMBOL(obd_put_request_slot);
2060 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2062 return cli->cl_max_rpcs_in_flight;
2064 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2066 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2068 struct obd_request_slot_waiter *orsw;
2072 const char *type_name;
2075 if (max > OBD_MAX_RIF_MAX || max < 1)
2078 type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2079 if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2080 /* adjust max_mod_rpcs_in_flight to ensure it is always
2081 * strictly lower that max_rpcs_in_flight */
2083 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2084 "because it must be higher than "
2085 "max_mod_rpcs_in_flight value",
2086 cli->cl_import->imp_obd->obd_name);
2089 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2090 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2096 spin_lock(&cli->cl_loi_list_lock);
2097 old = cli->cl_max_rpcs_in_flight;
2098 cli->cl_max_rpcs_in_flight = max;
2099 client_adjust_max_dirty(cli);
2103 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2104 for (i = 0; i < diff; i++) {
2105 if (list_empty(&cli->cl_flight_waiters))
2108 orsw = list_entry(cli->cl_flight_waiters.next,
2109 struct obd_request_slot_waiter, orsw_entry);
2110 list_del_init(&orsw->orsw_entry);
2111 cli->cl_rpcs_in_flight++;
2112 wake_up(&orsw->orsw_waitq);
2114 spin_unlock(&cli->cl_loi_list_lock);
2118 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2120 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2122 return cli->cl_max_mod_rpcs_in_flight;
2124 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2126 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2128 struct obd_connect_data *ocd;
2132 if (max > OBD_MAX_RIF_MAX || max < 1)
2135 /* cannot exceed or equal max_rpcs_in_flight */
2136 if (max >= cli->cl_max_rpcs_in_flight) {
2137 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2138 "higher or equal to max_rpcs_in_flight value (%u)\n",
2139 cli->cl_import->imp_obd->obd_name,
2140 max, cli->cl_max_rpcs_in_flight);
2144 /* cannot exceed max modify RPCs in flight supported by the server */
2145 ocd = &cli->cl_import->imp_connect_data;
2146 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2147 maxmodrpcs = ocd->ocd_maxmodrpcs;
2150 if (max > maxmodrpcs) {
2151 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2152 "higher than max_mod_rpcs_per_client value (%hu) "
2153 "returned by the server at connection\n",
2154 cli->cl_import->imp_obd->obd_name,
2159 spin_lock(&cli->cl_mod_rpcs_lock);
2161 prev = cli->cl_max_mod_rpcs_in_flight;
2162 cli->cl_max_mod_rpcs_in_flight = max;
2164 /* wakeup waiters if limit has been increased */
2165 if (cli->cl_max_mod_rpcs_in_flight > prev)
2166 wake_up(&cli->cl_mod_rpcs_waitq);
2168 spin_unlock(&cli->cl_mod_rpcs_lock);
2172 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2174 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2175 struct seq_file *seq)
2177 unsigned long mod_tot = 0, mod_cum;
2178 struct timespec64 now;
2181 ktime_get_real_ts64(&now);
2183 spin_lock(&cli->cl_mod_rpcs_lock);
2185 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2186 (s64)now.tv_sec, now.tv_nsec);
2187 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2188 cli->cl_mod_rpcs_in_flight);
2190 seq_printf(seq, "\n\t\t\tmodify\n");
2191 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2193 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2196 for (i = 0; i < OBD_HIST_MAX; i++) {
2197 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2199 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2200 i, mod, pct(mod, mod_tot),
2201 pct(mod_cum, mod_tot));
2202 if (mod_cum == mod_tot)
2206 spin_unlock(&cli->cl_mod_rpcs_lock);
2210 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2212 /* The number of modify RPCs sent in parallel is limited
2213 * because the server has a finite number of slots per client to
2214 * store request result and ensure reply reconstruction when needed.
2215 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2216 * that takes into account server limit and cl_max_rpcs_in_flight
2218 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2219 * one close request is allowed above the maximum.
2221 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2226 /* A slot is available if
2227 * - number of modify RPCs in flight is less than the max
2228 * - it's a close RPC and no other close request is in flight
2230 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2231 (close_req && cli->cl_close_rpcs_in_flight == 0);
2236 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2241 spin_lock(&cli->cl_mod_rpcs_lock);
2242 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2243 spin_unlock(&cli->cl_mod_rpcs_lock);
2248 /* Get a modify RPC slot from the obd client @cli according
2249 * to the kind of operation @opc that is going to be sent
2250 * and the intent @it of the operation if it applies.
2251 * If the maximum number of modify RPCs in flight is reached
2252 * the thread is put to sleep.
2253 * Returns the tag to be set in the request message. Tag 0
2254 * is reserved for non-modifying requests.
2256 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2258 bool close_req = false;
2261 if (opc == MDS_CLOSE)
2265 spin_lock(&cli->cl_mod_rpcs_lock);
2266 max = cli->cl_max_mod_rpcs_in_flight;
2267 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2268 /* there is a slot available */
2269 cli->cl_mod_rpcs_in_flight++;
2271 cli->cl_close_rpcs_in_flight++;
2272 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2273 cli->cl_mod_rpcs_in_flight);
2274 /* find a free tag */
2275 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2277 LASSERT(i < OBD_MAX_RIF_MAX);
2278 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2279 spin_unlock(&cli->cl_mod_rpcs_lock);
2280 /* tag 0 is reserved for non-modify RPCs */
2282 CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
2283 "opc %u, max %hu\n",
2284 cli->cl_import->imp_obd->obd_name,
2289 spin_unlock(&cli->cl_mod_rpcs_lock);
2291 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2292 "opc %u, max %hu\n",
2293 cli->cl_import->imp_obd->obd_name, opc, max);
2295 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2296 obd_mod_rpc_slot_avail(cli,
2300 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2302 /* Put a modify RPC slot from the obd client @cli according
2303 * to the kind of operation @opc that has been sent.
2305 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2307 bool close_req = false;
2312 if (opc == MDS_CLOSE)
2315 spin_lock(&cli->cl_mod_rpcs_lock);
2316 cli->cl_mod_rpcs_in_flight--;
2318 cli->cl_close_rpcs_in_flight--;
2319 /* release the tag in the bitmap */
2320 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2321 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2322 spin_unlock(&cli->cl_mod_rpcs_lock);
2323 wake_up(&cli->cl_mod_rpcs_waitq);
2325 EXPORT_SYMBOL(obd_put_mod_rpc_slot);