4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59 const char *status, int locks, int debug_level);
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90 obd, obd->obd_namespace, obd->obd_force);
93 lu_ref_fini(&obd->obd_reference);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 struct obd_type *class_search_type(const char *name)
99 struct kobject *kobj = kset_find_obj(lustre_kset, name);
101 if (kobj && kobj->ktype == &class_ktype)
102 return container_of(kobj, struct obd_type, typ_kobj);
107 EXPORT_SYMBOL(class_search_type);
109 struct obd_type *class_get_type(const char *name)
111 struct obd_type *type;
113 type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
116 const char *modname = name;
118 #ifdef HAVE_SERVER_SUPPORT
119 if (strcmp(modname, "obdfilter") == 0)
122 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123 modname = LUSTRE_OSP_NAME;
125 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126 modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 if (try_module_get(type->typ_dt_ops->o_owner)) {
140 atomic_inc(&type->typ_refcnt);
141 /* class_search_type() returned a counted reference,
142 * but we don't need that count any more as
143 * we have one through typ_refcnt.
145 kobject_put(&type->typ_kobj);
147 kobject_put(&type->typ_kobj);
154 void class_put_type(struct obd_type *type)
157 module_put(type->typ_dt_ops->o_owner);
158 atomic_dec(&type->typ_refcnt);
161 static void class_sysfs_release(struct kobject *kobj)
163 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
165 debugfs_remove_recursive(type->typ_debugfs_entry);
166 type->typ_debugfs_entry = NULL;
169 lu_device_type_fini(type->typ_lu);
171 #ifdef CONFIG_PROC_FS
172 if (type->typ_name && type->typ_procroot)
173 remove_proc_subtree(type->typ_name, proc_lustre_root);
175 OBD_FREE(type, sizeof(*type));
178 static struct kobj_type class_ktype = {
179 .sysfs_ops = &lustre_sysfs_ops,
180 .release = class_sysfs_release,
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
186 struct dentry *symlink;
187 struct obd_type *type;
190 type = class_search_type(name);
192 kobject_put(&type->typ_kobj);
193 return ERR_PTR(-EEXIST);
196 OBD_ALLOC(type, sizeof(*type));
198 return ERR_PTR(-ENOMEM);
200 type->typ_kobj.kset = lustre_kset;
201 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202 &lustre_kset->kobj, "%s", name);
206 symlink = debugfs_create_dir(name, debugfs_lustre_root);
207 if (IS_ERR_OR_NULL(symlink)) {
208 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
209 kobject_put(&type->typ_kobj);
212 type->typ_debugfs_entry = symlink;
213 type->typ_sym_filter = true;
216 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
218 if (IS_ERR(type->typ_procroot)) {
219 CERROR("%s: can't create compat proc entry: %d\n",
220 name, (int)PTR_ERR(type->typ_procroot));
221 type->typ_procroot = NULL;
227 EXPORT_SYMBOL(class_add_symlinks);
228 #endif /* HAVE_SERVER_SUPPORT */
230 #define CLASS_MAX_NAME 1024
232 int class_register_type(const struct obd_ops *dt_ops,
233 const struct md_ops *md_ops,
234 bool enable_proc, struct lprocfs_vars *vars,
235 const char *name, struct lu_device_type *ldt)
237 struct obd_type *type;
242 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
244 type = class_search_type(name);
246 #ifdef HAVE_SERVER_SUPPORT
247 if (type->typ_sym_filter)
249 #endif /* HAVE_SERVER_SUPPORT */
250 kobject_put(&type->typ_kobj);
251 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
255 OBD_ALLOC(type, sizeof(*type));
259 type->typ_kobj.kset = lustre_kset;
260 kobject_init(&type->typ_kobj, &class_ktype);
261 #ifdef HAVE_SERVER_SUPPORT
263 #endif /* HAVE_SERVER_SUPPORT */
265 type->typ_dt_ops = dt_ops;
266 type->typ_md_ops = md_ops;
268 #ifdef HAVE_SERVER_SUPPORT
269 if (type->typ_sym_filter) {
270 type->typ_sym_filter = false;
271 kobject_put(&type->typ_kobj);
275 #ifdef CONFIG_PROC_FS
276 if (enable_proc && !type->typ_procroot) {
277 type->typ_procroot = lprocfs_register(name,
280 if (IS_ERR(type->typ_procroot)) {
281 rc = PTR_ERR(type->typ_procroot);
282 type->typ_procroot = NULL;
287 type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
288 ldebugfs_add_vars(type->typ_debugfs_entry, vars, type);
290 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
293 #ifdef HAVE_SERVER_SUPPORT
298 rc = lu_device_type_init(ldt);
306 kobject_put(&type->typ_kobj);
310 EXPORT_SYMBOL(class_register_type);
312 int class_unregister_type(const char *name)
314 struct obd_type *type = class_search_type(name);
319 CERROR("unknown obd type\n");
323 if (atomic_read(&type->typ_refcnt)) {
324 CERROR("type %s has refcount (%d)\n", name,
325 atomic_read(&type->typ_refcnt));
326 /* This is a bad situation, let's make the best of it */
327 /* Remove ops, but leave the name for debugging */
328 type->typ_dt_ops = NULL;
329 type->typ_md_ops = NULL;
330 GOTO(out_put, rc = -EBUSY);
333 /* Put the final ref */
334 kobject_put(&type->typ_kobj);
336 /* Put the ref returned by class_search_type() */
337 kobject_put(&type->typ_kobj);
340 } /* class_unregister_type */
341 EXPORT_SYMBOL(class_unregister_type);
344 * Create a new obd device.
346 * Allocate the new obd_device and initialize it.
348 * \param[in] type_name obd device type string.
349 * \param[in] name obd device name.
350 * \param[in] uuid obd device UUID
352 * \retval newdev pointer to created obd_device
353 * \retval ERR_PTR(errno) on error
355 struct obd_device *class_newdev(const char *type_name, const char *name,
358 struct obd_device *newdev;
359 struct obd_type *type = NULL;
362 if (strlen(name) >= MAX_OBD_NAME) {
363 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
364 RETURN(ERR_PTR(-EINVAL));
367 type = class_get_type(type_name);
369 CERROR("OBD: unknown type: %s\n", type_name);
370 RETURN(ERR_PTR(-ENODEV));
373 newdev = obd_device_alloc();
374 if (newdev == NULL) {
375 class_put_type(type);
376 RETURN(ERR_PTR(-ENOMEM));
378 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
379 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
380 newdev->obd_type = type;
381 newdev->obd_minor = -1;
383 rwlock_init(&newdev->obd_pool_lock);
384 newdev->obd_pool_limit = 0;
385 newdev->obd_pool_slv = 0;
387 INIT_LIST_HEAD(&newdev->obd_exports);
388 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
389 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
390 INIT_LIST_HEAD(&newdev->obd_exports_timed);
391 INIT_LIST_HEAD(&newdev->obd_nid_stats);
392 spin_lock_init(&newdev->obd_nid_lock);
393 spin_lock_init(&newdev->obd_dev_lock);
394 mutex_init(&newdev->obd_dev_mutex);
395 spin_lock_init(&newdev->obd_osfs_lock);
396 /* newdev->obd_osfs_age must be set to a value in the distant
397 * past to guarantee a fresh statfs is fetched on mount. */
398 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
400 /* XXX belongs in setup not attach */
401 init_rwsem(&newdev->obd_observer_link_sem);
403 spin_lock_init(&newdev->obd_recovery_task_lock);
404 init_waitqueue_head(&newdev->obd_next_transno_waitq);
405 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
406 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
407 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
408 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
409 INIT_LIST_HEAD(&newdev->obd_evict_list);
410 INIT_LIST_HEAD(&newdev->obd_lwp_list);
412 llog_group_init(&newdev->obd_olg);
413 /* Detach drops this */
414 atomic_set(&newdev->obd_refcount, 1);
415 lu_ref_init(&newdev->obd_reference);
416 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
418 newdev->obd_conn_inprogress = 0;
420 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
422 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
423 newdev->obd_name, newdev);
431 * \param[in] obd obd_device to be freed
435 void class_free_dev(struct obd_device *obd)
437 struct obd_type *obd_type = obd->obd_type;
439 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
440 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
441 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
442 "obd %p != obd_devs[%d] %p\n",
443 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
444 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
445 "obd_refcount should be 0, not %d\n",
446 atomic_read(&obd->obd_refcount));
447 LASSERT(obd_type != NULL);
449 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
450 obd->obd_name, obd->obd_type->typ_name);
452 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
453 obd->obd_name, obd->obd_uuid.uuid);
454 if (obd->obd_stopping) {
457 /* If we're not stopping, we were never set up */
458 err = obd_cleanup(obd);
460 CERROR("Cleanup %s returned %d\n",
464 obd_device_free(obd);
466 class_put_type(obd_type);
470 * Unregister obd device.
472 * Free slot in obd_dev[] used by \a obd.
474 * \param[in] new_obd obd_device to be unregistered
478 void class_unregister_device(struct obd_device *obd)
480 write_lock(&obd_dev_lock);
481 if (obd->obd_minor >= 0) {
482 LASSERT(obd_devs[obd->obd_minor] == obd);
483 obd_devs[obd->obd_minor] = NULL;
486 write_unlock(&obd_dev_lock);
490 * Register obd device.
492 * Find free slot in obd_devs[], fills it with \a new_obd.
494 * \param[in] new_obd obd_device to be registered
497 * \retval -EEXIST device with this name is registered
498 * \retval -EOVERFLOW obd_devs[] is full
500 int class_register_device(struct obd_device *new_obd)
504 int new_obd_minor = 0;
505 bool minor_assign = false;
506 bool retried = false;
509 write_lock(&obd_dev_lock);
510 for (i = 0; i < class_devno_max(); i++) {
511 struct obd_device *obd = class_num2obd(i);
514 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
517 write_unlock(&obd_dev_lock);
519 /* the obd_device could be waited to be
520 * destroyed by the "obd_zombie_impexp_thread".
522 obd_zombie_barrier();
527 CERROR("%s: already exists, won't add\n",
529 /* in case we found a free slot before duplicate */
530 minor_assign = false;
534 if (!minor_assign && obd == NULL) {
541 new_obd->obd_minor = new_obd_minor;
542 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
543 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
544 obd_devs[new_obd_minor] = new_obd;
548 CERROR("%s: all %u/%u devices used, increase "
549 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
550 i, class_devno_max(), ret);
553 write_unlock(&obd_dev_lock);
558 static int class_name2dev_nolock(const char *name)
565 for (i = 0; i < class_devno_max(); i++) {
566 struct obd_device *obd = class_num2obd(i);
568 if (obd && strcmp(name, obd->obd_name) == 0) {
569 /* Make sure we finished attaching before we give
570 out any references */
571 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
572 if (obd->obd_attached) {
582 int class_name2dev(const char *name)
589 read_lock(&obd_dev_lock);
590 i = class_name2dev_nolock(name);
591 read_unlock(&obd_dev_lock);
595 EXPORT_SYMBOL(class_name2dev);
597 struct obd_device *class_name2obd(const char *name)
599 int dev = class_name2dev(name);
601 if (dev < 0 || dev > class_devno_max())
603 return class_num2obd(dev);
605 EXPORT_SYMBOL(class_name2obd);
607 int class_uuid2dev_nolock(struct obd_uuid *uuid)
611 for (i = 0; i < class_devno_max(); i++) {
612 struct obd_device *obd = class_num2obd(i);
614 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
615 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
623 int class_uuid2dev(struct obd_uuid *uuid)
627 read_lock(&obd_dev_lock);
628 i = class_uuid2dev_nolock(uuid);
629 read_unlock(&obd_dev_lock);
633 EXPORT_SYMBOL(class_uuid2dev);
635 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
637 int dev = class_uuid2dev(uuid);
640 return class_num2obd(dev);
642 EXPORT_SYMBOL(class_uuid2obd);
645 * Get obd device from ::obd_devs[]
647 * \param num [in] array index
649 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
650 * otherwise return the obd device there.
652 struct obd_device *class_num2obd(int num)
654 struct obd_device *obd = NULL;
656 if (num < class_devno_max()) {
661 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
662 "%p obd_magic %08x != %08x\n",
663 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
664 LASSERTF(obd->obd_minor == num,
665 "%p obd_minor %0d != %0d\n",
666 obd, obd->obd_minor, num);
673 * Find obd in obd_dev[] by name or uuid.
675 * Increment obd's refcount if found.
677 * \param[in] str obd name or uuid
679 * \retval NULL if not found
680 * \retval target pointer to found obd_device
682 struct obd_device *class_dev_by_str(const char *str)
684 struct obd_device *target = NULL;
685 struct obd_uuid tgtuuid;
688 obd_str2uuid(&tgtuuid, str);
690 read_lock(&obd_dev_lock);
691 rc = class_uuid2dev_nolock(&tgtuuid);
693 rc = class_name2dev_nolock(str);
696 target = class_num2obd(rc);
699 class_incref(target, "find", current);
700 read_unlock(&obd_dev_lock);
704 EXPORT_SYMBOL(class_dev_by_str);
707 * Get obd devices count. Device in any
709 * \retval obd device count
711 int get_devices_count(void)
713 int index, max_index = class_devno_max(), dev_count = 0;
715 read_lock(&obd_dev_lock);
716 for (index = 0; index <= max_index; index++) {
717 struct obd_device *obd = class_num2obd(index);
721 read_unlock(&obd_dev_lock);
725 EXPORT_SYMBOL(get_devices_count);
727 void class_obd_list(void)
732 read_lock(&obd_dev_lock);
733 for (i = 0; i < class_devno_max(); i++) {
734 struct obd_device *obd = class_num2obd(i);
738 if (obd->obd_stopping)
740 else if (obd->obd_set_up)
742 else if (obd->obd_attached)
746 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
747 i, status, obd->obd_type->typ_name,
748 obd->obd_name, obd->obd_uuid.uuid,
749 atomic_read(&obd->obd_refcount));
751 read_unlock(&obd_dev_lock);
754 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
755 * specified, then only the client with that uuid is returned,
756 * otherwise any client connected to the tgt is returned.
758 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
759 const char *type_name,
760 struct obd_uuid *grp_uuid)
764 read_lock(&obd_dev_lock);
765 for (i = 0; i < class_devno_max(); i++) {
766 struct obd_device *obd = class_num2obd(i);
770 if ((strncmp(obd->obd_type->typ_name, type_name,
771 strlen(type_name)) == 0)) {
772 if (obd_uuid_equals(tgt_uuid,
773 &obd->u.cli.cl_target_uuid) &&
774 ((grp_uuid)? obd_uuid_equals(grp_uuid,
775 &obd->obd_uuid) : 1)) {
776 read_unlock(&obd_dev_lock);
781 read_unlock(&obd_dev_lock);
785 EXPORT_SYMBOL(class_find_client_obd);
787 /* Iterate the obd_device list looking devices have grp_uuid. Start
788 * searching at *next, and if a device is found, the next index to look
789 * at is saved in *next. If next is NULL, then the first matching device
790 * will always be returned.
792 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
798 else if (*next >= 0 && *next < class_devno_max())
803 read_lock(&obd_dev_lock);
804 for (; i < class_devno_max(); i++) {
805 struct obd_device *obd = class_num2obd(i);
809 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
812 read_unlock(&obd_dev_lock);
816 read_unlock(&obd_dev_lock);
820 EXPORT_SYMBOL(class_devices_in_group);
823 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
824 * adjust sptlrpc settings accordingly.
826 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
828 struct obd_device *obd;
832 LASSERT(namelen > 0);
834 read_lock(&obd_dev_lock);
835 for (i = 0; i < class_devno_max(); i++) {
836 obd = class_num2obd(i);
838 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
841 /* only notify mdc, osc, osp, lwp, mdt, ost
842 * because only these have a -sptlrpc llog */
843 type = obd->obd_type->typ_name;
844 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
845 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
846 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
847 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
848 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
849 strcmp(type, LUSTRE_OST_NAME) != 0)
852 if (strncmp(obd->obd_name, fsname, namelen))
855 class_incref(obd, __FUNCTION__, obd);
856 read_unlock(&obd_dev_lock);
857 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
858 sizeof(KEY_SPTLRPC_CONF),
859 KEY_SPTLRPC_CONF, 0, NULL, NULL);
861 class_decref(obd, __FUNCTION__, obd);
862 read_lock(&obd_dev_lock);
864 read_unlock(&obd_dev_lock);
867 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
869 void obd_cleanup_caches(void)
872 if (obd_device_cachep) {
873 kmem_cache_destroy(obd_device_cachep);
874 obd_device_cachep = NULL;
880 int obd_init_caches(void)
885 LASSERT(obd_device_cachep == NULL);
886 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
887 sizeof(struct obd_device),
888 0, 0, 0, sizeof(struct obd_device), NULL);
889 if (!obd_device_cachep)
890 GOTO(out, rc = -ENOMEM);
894 obd_cleanup_caches();
898 static const char export_handle_owner[] = "export";
900 /* map connection to client */
901 struct obd_export *class_conn2export(struct lustre_handle *conn)
903 struct obd_export *export;
907 CDEBUG(D_CACHE, "looking for null handle\n");
911 if (conn->cookie == -1) { /* this means assign a new connection */
912 CDEBUG(D_CACHE, "want a new connection\n");
916 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
917 export = class_handle2object(conn->cookie, export_handle_owner);
920 EXPORT_SYMBOL(class_conn2export);
922 struct obd_device *class_exp2obd(struct obd_export *exp)
928 EXPORT_SYMBOL(class_exp2obd);
930 struct obd_import *class_exp2cliimp(struct obd_export *exp)
932 struct obd_device *obd = exp->exp_obd;
935 return obd->u.cli.cl_import;
937 EXPORT_SYMBOL(class_exp2cliimp);
939 /* Export management functions */
940 static void class_export_destroy(struct obd_export *exp)
942 struct obd_device *obd = exp->exp_obd;
945 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
946 LASSERT(obd != NULL);
948 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
949 exp->exp_client_uuid.uuid, obd->obd_name);
951 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
952 if (exp->exp_connection)
953 ptlrpc_put_connection_superhack(exp->exp_connection);
955 LASSERT(list_empty(&exp->exp_outstanding_replies));
956 LASSERT(list_empty(&exp->exp_uncommitted_replies));
957 LASSERT(list_empty(&exp->exp_req_replay_queue));
958 LASSERT(list_empty(&exp->exp_hp_rpcs));
959 obd_destroy_export(exp);
960 /* self export doesn't hold a reference to an obd, although it
961 * exists until freeing of the obd */
962 if (exp != obd->obd_self_export)
963 class_decref(obd, "export", exp);
965 OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
966 kfree_rcu(exp, exp_handle.h_rcu);
970 struct obd_export *class_export_get(struct obd_export *exp)
972 refcount_inc(&exp->exp_handle.h_ref);
973 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
974 refcount_read(&exp->exp_handle.h_ref));
977 EXPORT_SYMBOL(class_export_get);
979 void class_export_put(struct obd_export *exp)
981 LASSERT(exp != NULL);
982 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
983 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
984 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
985 refcount_read(&exp->exp_handle.h_ref) - 1);
987 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
988 struct obd_device *obd = exp->exp_obd;
990 CDEBUG(D_IOCTL, "final put %p/%s\n",
991 exp, exp->exp_client_uuid.uuid);
993 /* release nid stat refererence */
994 lprocfs_exp_cleanup(exp);
996 if (exp == obd->obd_self_export) {
997 /* self export should be destroyed without
998 * zombie thread as it doesn't hold a
999 * reference to obd and doesn't hold any
1001 class_export_destroy(exp);
1002 /* self export is destroyed, no class
1003 * references exist and it is safe to free
1005 class_free_dev(obd);
1007 LASSERT(!list_empty(&exp->exp_obd_chain));
1008 obd_zombie_export_add(exp);
1013 EXPORT_SYMBOL(class_export_put);
1015 static void obd_zombie_exp_cull(struct work_struct *ws)
1017 struct obd_export *export;
1019 export = container_of(ws, struct obd_export, exp_zombie_work);
1020 class_export_destroy(export);
1023 /* Creates a new export, adds it to the hash table, and returns a
1024 * pointer to it. The refcount is 2: one for the hash reference, and
1025 * one for the pointer returned by this function. */
1026 struct obd_export *__class_new_export(struct obd_device *obd,
1027 struct obd_uuid *cluuid, bool is_self)
1029 struct obd_export *export;
1033 OBD_ALLOC_PTR(export);
1035 return ERR_PTR(-ENOMEM);
1037 export->exp_conn_cnt = 0;
1038 export->exp_lock_hash = NULL;
1039 export->exp_flock_hash = NULL;
1040 /* 2 = class_handle_hash + last */
1041 refcount_set(&export->exp_handle.h_ref, 2);
1042 atomic_set(&export->exp_rpc_count, 0);
1043 atomic_set(&export->exp_cb_count, 0);
1044 atomic_set(&export->exp_locks_count, 0);
1045 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1046 INIT_LIST_HEAD(&export->exp_locks_list);
1047 spin_lock_init(&export->exp_locks_list_guard);
1049 atomic_set(&export->exp_replay_count, 0);
1050 export->exp_obd = obd;
1051 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1052 spin_lock_init(&export->exp_uncommitted_replies_lock);
1053 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1054 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1055 INIT_HLIST_NODE(&export->exp_handle.h_link);
1056 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1057 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1058 class_handle_hash(&export->exp_handle, export_handle_owner);
1059 export->exp_last_request_time = ktime_get_real_seconds();
1060 spin_lock_init(&export->exp_lock);
1061 spin_lock_init(&export->exp_rpc_lock);
1062 INIT_HLIST_NODE(&export->exp_nid_hash);
1063 INIT_HLIST_NODE(&export->exp_gen_hash);
1064 spin_lock_init(&export->exp_bl_list_lock);
1065 INIT_LIST_HEAD(&export->exp_bl_list);
1066 INIT_LIST_HEAD(&export->exp_stale_list);
1067 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1069 export->exp_sp_peer = LUSTRE_SP_ANY;
1070 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1071 export->exp_client_uuid = *cluuid;
1072 obd_init_export(export);
1074 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1076 spin_lock(&obd->obd_dev_lock);
1077 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1078 /* shouldn't happen, but might race */
1079 if (obd->obd_stopping)
1080 GOTO(exit_unlock, rc = -ENODEV);
1082 rc = obd_uuid_add(obd, export);
1084 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1085 obd->obd_name, cluuid->uuid, rc);
1086 GOTO(exit_unlock, rc = -EALREADY);
1091 class_incref(obd, "export", export);
1092 list_add_tail(&export->exp_obd_chain_timed,
1093 &obd->obd_exports_timed);
1094 list_add(&export->exp_obd_chain, &obd->obd_exports);
1095 obd->obd_num_exports++;
1097 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1098 INIT_LIST_HEAD(&export->exp_obd_chain);
1100 spin_unlock(&obd->obd_dev_lock);
1104 spin_unlock(&obd->obd_dev_lock);
1105 class_handle_unhash(&export->exp_handle);
1106 obd_destroy_export(export);
1107 OBD_FREE_PTR(export);
1111 struct obd_export *class_new_export(struct obd_device *obd,
1112 struct obd_uuid *uuid)
1114 return __class_new_export(obd, uuid, false);
1116 EXPORT_SYMBOL(class_new_export);
1118 struct obd_export *class_new_export_self(struct obd_device *obd,
1119 struct obd_uuid *uuid)
1121 return __class_new_export(obd, uuid, true);
1124 void class_unlink_export(struct obd_export *exp)
1126 class_handle_unhash(&exp->exp_handle);
1128 if (exp->exp_obd->obd_self_export == exp) {
1129 class_export_put(exp);
1133 spin_lock(&exp->exp_obd->obd_dev_lock);
1134 /* delete an uuid-export hashitem from hashtables */
1135 if (exp != exp->exp_obd->obd_self_export)
1136 obd_uuid_del(exp->exp_obd, exp);
1138 #ifdef HAVE_SERVER_SUPPORT
1139 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1140 struct tg_export_data *ted = &exp->exp_target_data;
1141 struct cfs_hash *hash;
1143 /* Because obd_gen_hash will not be released until
1144 * class_cleanup(), so hash should never be NULL here */
1145 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1146 LASSERT(hash != NULL);
1147 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1148 &exp->exp_gen_hash);
1149 cfs_hash_putref(hash);
1151 #endif /* HAVE_SERVER_SUPPORT */
1153 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1154 list_del_init(&exp->exp_obd_chain_timed);
1155 exp->exp_obd->obd_num_exports--;
1156 spin_unlock(&exp->exp_obd->obd_dev_lock);
1157 atomic_inc(&obd_stale_export_num);
1159 /* A reference is kept by obd_stale_exports list */
1160 obd_stale_export_put(exp);
1162 EXPORT_SYMBOL(class_unlink_export);
1164 /* Import management functions */
1165 static void obd_zombie_import_free(struct obd_import *imp)
1169 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1170 imp->imp_obd->obd_name);
1172 LASSERT(refcount_read(&imp->imp_refcount) == 0);
1174 ptlrpc_put_connection_superhack(imp->imp_connection);
1176 while (!list_empty(&imp->imp_conn_list)) {
1177 struct obd_import_conn *imp_conn;
1179 imp_conn = list_first_entry(&imp->imp_conn_list,
1180 struct obd_import_conn, oic_item);
1181 list_del_init(&imp_conn->oic_item);
1182 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1183 OBD_FREE(imp_conn, sizeof(*imp_conn));
1186 LASSERT(imp->imp_sec == NULL);
1187 class_decref(imp->imp_obd, "import", imp);
1192 struct obd_import *class_import_get(struct obd_import *import)
1194 refcount_inc(&import->imp_refcount);
1195 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1196 refcount_read(&import->imp_refcount),
1197 import->imp_obd->obd_name);
1200 EXPORT_SYMBOL(class_import_get);
1202 void class_import_put(struct obd_import *imp)
1206 LASSERT(refcount_read(&imp->imp_refcount) > 0);
1208 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1209 refcount_read(&imp->imp_refcount) - 1,
1210 imp->imp_obd->obd_name);
1212 if (refcount_dec_and_test(&imp->imp_refcount)) {
1213 CDEBUG(D_INFO, "final put import %p\n", imp);
1214 obd_zombie_import_add(imp);
1219 EXPORT_SYMBOL(class_import_put);
1221 static void init_imp_at(struct imp_at *at) {
1223 at_init(&at->iat_net_latency, 0, 0);
1224 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1225 /* max service estimates are tracked on the server side, so
1226 don't use the AT history here, just use the last reported
1227 val. (But keep hist for proc histogram, worst_ever) */
1228 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1233 static void obd_zombie_imp_cull(struct work_struct *ws)
1235 struct obd_import *import;
1237 import = container_of(ws, struct obd_import, imp_zombie_work);
1238 obd_zombie_import_free(import);
1241 struct obd_import *class_new_import(struct obd_device *obd)
1243 struct obd_import *imp;
1244 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1246 OBD_ALLOC(imp, sizeof(*imp));
1250 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1251 INIT_LIST_HEAD(&imp->imp_replay_list);
1252 INIT_LIST_HEAD(&imp->imp_sending_list);
1253 INIT_LIST_HEAD(&imp->imp_delayed_list);
1254 INIT_LIST_HEAD(&imp->imp_committed_list);
1255 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1256 imp->imp_known_replied_xid = 0;
1257 imp->imp_replay_cursor = &imp->imp_committed_list;
1258 spin_lock_init(&imp->imp_lock);
1259 imp->imp_last_success_conn = 0;
1260 imp->imp_state = LUSTRE_IMP_NEW;
1261 imp->imp_obd = class_incref(obd, "import", imp);
1262 rwlock_init(&imp->imp_sec_lock);
1263 init_waitqueue_head(&imp->imp_recovery_waitq);
1264 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1266 if (curr_pid_ns && curr_pid_ns->child_reaper)
1267 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1269 imp->imp_sec_refpid = 1;
1271 refcount_set(&imp->imp_refcount, 2);
1272 atomic_set(&imp->imp_unregistering, 0);
1273 atomic_set(&imp->imp_inflight, 0);
1274 atomic_set(&imp->imp_replay_inflight, 0);
1275 atomic_set(&imp->imp_inval_count, 0);
1276 INIT_LIST_HEAD(&imp->imp_conn_list);
1277 init_imp_at(&imp->imp_at);
1279 /* the default magic is V2, will be used in connect RPC, and
1280 * then adjusted according to the flags in request/reply. */
1281 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1285 EXPORT_SYMBOL(class_new_import);
1287 void class_destroy_import(struct obd_import *import)
1289 LASSERT(import != NULL);
1290 LASSERT(import != LP_POISON);
1292 spin_lock(&import->imp_lock);
1293 import->imp_generation++;
1294 spin_unlock(&import->imp_lock);
1295 class_import_put(import);
1297 EXPORT_SYMBOL(class_destroy_import);
1299 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1301 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1303 spin_lock(&exp->exp_locks_list_guard);
1305 LASSERT(lock->l_exp_refs_nr >= 0);
1307 if (lock->l_exp_refs_target != NULL &&
1308 lock->l_exp_refs_target != exp) {
1309 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1310 exp, lock, lock->l_exp_refs_target);
1312 if ((lock->l_exp_refs_nr ++) == 0) {
1313 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1314 lock->l_exp_refs_target = exp;
1316 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1317 lock, exp, lock->l_exp_refs_nr);
1318 spin_unlock(&exp->exp_locks_list_guard);
1320 EXPORT_SYMBOL(__class_export_add_lock_ref);
1322 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1324 spin_lock(&exp->exp_locks_list_guard);
1325 LASSERT(lock->l_exp_refs_nr > 0);
1326 if (lock->l_exp_refs_target != exp) {
1327 LCONSOLE_WARN("lock %p, "
1328 "mismatching export pointers: %p, %p\n",
1329 lock, lock->l_exp_refs_target, exp);
1331 if (-- lock->l_exp_refs_nr == 0) {
1332 list_del_init(&lock->l_exp_refs_link);
1333 lock->l_exp_refs_target = NULL;
1335 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1336 lock, exp, lock->l_exp_refs_nr);
1337 spin_unlock(&exp->exp_locks_list_guard);
1339 EXPORT_SYMBOL(__class_export_del_lock_ref);
1342 /* A connection defines an export context in which preallocation can
1343 be managed. This releases the export pointer reference, and returns
1344 the export handle, so the export refcount is 1 when this function
1346 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1347 struct obd_uuid *cluuid)
1349 struct obd_export *export;
1350 LASSERT(conn != NULL);
1351 LASSERT(obd != NULL);
1352 LASSERT(cluuid != NULL);
1355 export = class_new_export(obd, cluuid);
1357 RETURN(PTR_ERR(export));
1359 conn->cookie = export->exp_handle.h_cookie;
1360 class_export_put(export);
1362 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1363 cluuid->uuid, conn->cookie);
1366 EXPORT_SYMBOL(class_connect);
1368 /* if export is involved in recovery then clean up related things */
1369 static void class_export_recovery_cleanup(struct obd_export *exp)
1371 struct obd_device *obd = exp->exp_obd;
1373 spin_lock(&obd->obd_recovery_task_lock);
1374 if (obd->obd_recovering) {
1375 if (exp->exp_in_recovery) {
1376 spin_lock(&exp->exp_lock);
1377 exp->exp_in_recovery = 0;
1378 spin_unlock(&exp->exp_lock);
1379 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1380 atomic_dec(&obd->obd_connected_clients);
1383 /* if called during recovery then should update
1384 * obd_stale_clients counter,
1385 * lightweight exports are not counted */
1386 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1387 exp->exp_obd->obd_stale_clients++;
1389 spin_unlock(&obd->obd_recovery_task_lock);
1391 spin_lock(&exp->exp_lock);
1392 /** Cleanup req replay fields */
1393 if (exp->exp_req_replay_needed) {
1394 exp->exp_req_replay_needed = 0;
1396 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1397 atomic_dec(&obd->obd_req_replay_clients);
1400 /** Cleanup lock replay data */
1401 if (exp->exp_lock_replay_needed) {
1402 exp->exp_lock_replay_needed = 0;
1404 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1405 atomic_dec(&obd->obd_lock_replay_clients);
1407 spin_unlock(&exp->exp_lock);
1410 /* This function removes 1-3 references from the export:
1411 * 1 - for export pointer passed
1412 * and if disconnect really need
1413 * 2 - removing from hash
1414 * 3 - in client_unlink_export
1415 * The export pointer passed to this function can destroyed */
1416 int class_disconnect(struct obd_export *export)
1418 int already_disconnected;
1421 if (export == NULL) {
1422 CWARN("attempting to free NULL export %p\n", export);
1426 spin_lock(&export->exp_lock);
1427 already_disconnected = export->exp_disconnected;
1428 export->exp_disconnected = 1;
1429 /* We hold references of export for uuid hash
1430 * and nid_hash and export link at least. So
1431 * it is safe to call cfs_hash_del in there. */
1432 if (!hlist_unhashed(&export->exp_nid_hash))
1433 cfs_hash_del(export->exp_obd->obd_nid_hash,
1434 &export->exp_connection->c_peer.nid,
1435 &export->exp_nid_hash);
1436 spin_unlock(&export->exp_lock);
1438 /* class_cleanup(), abort_recovery(), and class_fail_export()
1439 * all end up in here, and if any of them race we shouldn't
1440 * call extra class_export_puts(). */
1441 if (already_disconnected) {
1442 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1443 GOTO(no_disconn, already_disconnected);
1446 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1447 export->exp_handle.h_cookie);
1449 class_export_recovery_cleanup(export);
1450 class_unlink_export(export);
1452 class_export_put(export);
1455 EXPORT_SYMBOL(class_disconnect);
1457 /* Return non-zero for a fully connected export */
1458 int class_connected_export(struct obd_export *exp)
1463 spin_lock(&exp->exp_lock);
1464 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1465 spin_unlock(&exp->exp_lock);
1469 EXPORT_SYMBOL(class_connected_export);
1471 static void class_disconnect_export_list(struct list_head *list,
1472 enum obd_option flags)
1475 struct obd_export *exp;
1478 /* It's possible that an export may disconnect itself, but
1479 * nothing else will be added to this list. */
1480 while (!list_empty(list)) {
1481 exp = list_first_entry(list, struct obd_export,
1483 /* need for safe call CDEBUG after obd_disconnect */
1484 class_export_get(exp);
1486 spin_lock(&exp->exp_lock);
1487 exp->exp_flags = flags;
1488 spin_unlock(&exp->exp_lock);
1490 if (obd_uuid_equals(&exp->exp_client_uuid,
1491 &exp->exp_obd->obd_uuid)) {
1493 "exp %p export uuid == obd uuid, don't discon\n",
1495 /* Need to delete this now so we don't end up pointing
1496 * to work_list later when this export is cleaned up. */
1497 list_del_init(&exp->exp_obd_chain);
1498 class_export_put(exp);
1502 class_export_get(exp);
1503 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1504 "last request at %lld\n",
1505 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1506 exp, exp->exp_last_request_time);
1507 /* release one export reference anyway */
1508 rc = obd_disconnect(exp);
1510 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1511 obd_export_nid2str(exp), exp, rc);
1512 class_export_put(exp);
1517 void class_disconnect_exports(struct obd_device *obd)
1519 LIST_HEAD(work_list);
1522 /* Move all of the exports from obd_exports to a work list, en masse. */
1523 spin_lock(&obd->obd_dev_lock);
1524 list_splice_init(&obd->obd_exports, &work_list);
1525 list_splice_init(&obd->obd_delayed_exports, &work_list);
1526 spin_unlock(&obd->obd_dev_lock);
1528 if (!list_empty(&work_list)) {
1529 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1530 "disconnecting them\n", obd->obd_minor, obd);
1531 class_disconnect_export_list(&work_list,
1532 exp_flags_from_obd(obd));
1534 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1535 obd->obd_minor, obd);
1538 EXPORT_SYMBOL(class_disconnect_exports);
1540 /* Remove exports that have not completed recovery.
1542 void class_disconnect_stale_exports(struct obd_device *obd,
1543 int (*test_export)(struct obd_export *))
1545 LIST_HEAD(work_list);
1546 struct obd_export *exp, *n;
1550 spin_lock(&obd->obd_dev_lock);
1551 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1553 /* don't count self-export as client */
1554 if (obd_uuid_equals(&exp->exp_client_uuid,
1555 &exp->exp_obd->obd_uuid))
1558 /* don't evict clients which have no slot in last_rcvd
1559 * (e.g. lightweight connection) */
1560 if (exp->exp_target_data.ted_lr_idx == -1)
1563 spin_lock(&exp->exp_lock);
1564 if (exp->exp_failed || test_export(exp)) {
1565 spin_unlock(&exp->exp_lock);
1568 exp->exp_failed = 1;
1569 spin_unlock(&exp->exp_lock);
1571 list_move(&exp->exp_obd_chain, &work_list);
1573 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1574 obd->obd_name, exp->exp_client_uuid.uuid,
1575 obd_export_nid2str(exp));
1576 print_export_data(exp, "EVICTING", 0, D_HA);
1578 spin_unlock(&obd->obd_dev_lock);
1581 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1582 obd->obd_name, evicted);
1584 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1585 OBD_OPT_ABORT_RECOV);
1588 EXPORT_SYMBOL(class_disconnect_stale_exports);
1590 void class_fail_export(struct obd_export *exp)
1592 int rc, already_failed;
1594 spin_lock(&exp->exp_lock);
1595 already_failed = exp->exp_failed;
1596 exp->exp_failed = 1;
1597 spin_unlock(&exp->exp_lock);
1599 if (already_failed) {
1600 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1601 exp, exp->exp_client_uuid.uuid);
1605 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1606 exp, exp->exp_client_uuid.uuid);
1608 if (obd_dump_on_timeout)
1609 libcfs_debug_dumplog();
1611 /* need for safe call CDEBUG after obd_disconnect */
1612 class_export_get(exp);
1614 /* Most callers into obd_disconnect are removing their own reference
1615 * (request, for example) in addition to the one from the hash table.
1616 * We don't have such a reference here, so make one. */
1617 class_export_get(exp);
1618 rc = obd_disconnect(exp);
1620 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1622 CDEBUG(D_HA, "disconnected export %p/%s\n",
1623 exp, exp->exp_client_uuid.uuid);
1624 class_export_put(exp);
1626 EXPORT_SYMBOL(class_fail_export);
1628 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1630 struct cfs_hash *nid_hash;
1631 struct obd_export *doomed_exp = NULL;
1632 int exports_evicted = 0;
1634 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1636 spin_lock(&obd->obd_dev_lock);
1637 /* umount has run already, so evict thread should leave
1638 * its task to umount thread now */
1639 if (obd->obd_stopping) {
1640 spin_unlock(&obd->obd_dev_lock);
1641 return exports_evicted;
1643 nid_hash = obd->obd_nid_hash;
1644 cfs_hash_getref(nid_hash);
1645 spin_unlock(&obd->obd_dev_lock);
1648 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1649 if (doomed_exp == NULL)
1652 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1653 "nid %s found, wanted nid %s, requested nid %s\n",
1654 obd_export_nid2str(doomed_exp),
1655 libcfs_nid2str(nid_key), nid);
1656 LASSERTF(doomed_exp != obd->obd_self_export,
1657 "self-export is hashed by NID?\n");
1659 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1660 "request\n", obd->obd_name,
1661 obd_uuid2str(&doomed_exp->exp_client_uuid),
1662 obd_export_nid2str(doomed_exp));
1663 class_fail_export(doomed_exp);
1664 class_export_put(doomed_exp);
1667 cfs_hash_putref(nid_hash);
1669 if (!exports_evicted)
1670 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1671 obd->obd_name, nid);
1672 return exports_evicted;
1674 EXPORT_SYMBOL(obd_export_evict_by_nid);
1676 #ifdef HAVE_SERVER_SUPPORT
1677 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1679 struct obd_export *doomed_exp = NULL;
1680 struct obd_uuid doomed_uuid;
1681 int exports_evicted = 0;
1683 spin_lock(&obd->obd_dev_lock);
1684 if (obd->obd_stopping) {
1685 spin_unlock(&obd->obd_dev_lock);
1686 return exports_evicted;
1688 spin_unlock(&obd->obd_dev_lock);
1690 obd_str2uuid(&doomed_uuid, uuid);
1691 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1692 CERROR("%s: can't evict myself\n", obd->obd_name);
1693 return exports_evicted;
1696 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1697 if (doomed_exp == NULL) {
1698 CERROR("%s: can't disconnect %s: no exports found\n",
1699 obd->obd_name, uuid);
1701 CWARN("%s: evicting %s at adminstrative request\n",
1702 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1703 class_fail_export(doomed_exp);
1704 class_export_put(doomed_exp);
1705 obd_uuid_del(obd, doomed_exp);
1709 return exports_evicted;
1711 #endif /* HAVE_SERVER_SUPPORT */
1713 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1714 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1715 EXPORT_SYMBOL(class_export_dump_hook);
1718 static void print_export_data(struct obd_export *exp, const char *status,
1719 int locks, int debug_level)
1721 struct ptlrpc_reply_state *rs;
1722 struct ptlrpc_reply_state *first_reply = NULL;
1725 spin_lock(&exp->exp_lock);
1726 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1732 spin_unlock(&exp->exp_lock);
1734 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1735 "%p %s %llu stale:%d\n",
1736 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1737 obd_export_nid2str(exp),
1738 refcount_read(&exp->exp_handle.h_ref),
1739 atomic_read(&exp->exp_rpc_count),
1740 atomic_read(&exp->exp_cb_count),
1741 atomic_read(&exp->exp_locks_count),
1742 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1743 nreplies, first_reply, nreplies > 3 ? "..." : "",
1744 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1745 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1746 if (locks && class_export_dump_hook != NULL)
1747 class_export_dump_hook(exp);
1751 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1753 struct obd_export *exp;
1755 spin_lock(&obd->obd_dev_lock);
1756 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1757 print_export_data(exp, "ACTIVE", locks, debug_level);
1758 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1759 print_export_data(exp, "UNLINKED", locks, debug_level);
1760 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1761 print_export_data(exp, "DELAYED", locks, debug_level);
1762 spin_unlock(&obd->obd_dev_lock);
1765 void obd_exports_barrier(struct obd_device *obd)
1768 LASSERT(list_empty(&obd->obd_exports));
1769 spin_lock(&obd->obd_dev_lock);
1770 while (!list_empty(&obd->obd_unlinked_exports)) {
1771 spin_unlock(&obd->obd_dev_lock);
1772 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1773 if (waited > 5 && is_power_of_2(waited)) {
1774 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1775 "more than %d seconds. "
1776 "The obd refcount = %d. Is it stuck?\n",
1777 obd->obd_name, waited,
1778 atomic_read(&obd->obd_refcount));
1779 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1782 spin_lock(&obd->obd_dev_lock);
1784 spin_unlock(&obd->obd_dev_lock);
1786 EXPORT_SYMBOL(obd_exports_barrier);
1789 * Add export to the obd_zombe thread and notify it.
1791 static void obd_zombie_export_add(struct obd_export *exp) {
1792 atomic_dec(&obd_stale_export_num);
1793 spin_lock(&exp->exp_obd->obd_dev_lock);
1794 LASSERT(!list_empty(&exp->exp_obd_chain));
1795 list_del_init(&exp->exp_obd_chain);
1796 spin_unlock(&exp->exp_obd->obd_dev_lock);
1798 queue_work(zombie_wq, &exp->exp_zombie_work);
1802 * Add import to the obd_zombe thread and notify it.
1804 static void obd_zombie_import_add(struct obd_import *imp) {
1805 LASSERT(imp->imp_sec == NULL);
1807 queue_work(zombie_wq, &imp->imp_zombie_work);
1811 * wait when obd_zombie import/export queues become empty
1813 void obd_zombie_barrier(void)
1815 flush_workqueue(zombie_wq);
1817 EXPORT_SYMBOL(obd_zombie_barrier);
1820 struct obd_export *obd_stale_export_get(void)
1822 struct obd_export *exp = NULL;
1825 spin_lock(&obd_stale_export_lock);
1826 if (!list_empty(&obd_stale_exports)) {
1827 exp = list_first_entry(&obd_stale_exports,
1828 struct obd_export, exp_stale_list);
1829 list_del_init(&exp->exp_stale_list);
1831 spin_unlock(&obd_stale_export_lock);
1834 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1835 atomic_read(&obd_stale_export_num));
1839 EXPORT_SYMBOL(obd_stale_export_get);
1841 void obd_stale_export_put(struct obd_export *exp)
1845 LASSERT(list_empty(&exp->exp_stale_list));
1846 if (exp->exp_lock_hash &&
1847 atomic_read(&exp->exp_lock_hash->hs_count)) {
1848 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1849 atomic_read(&obd_stale_export_num));
1851 spin_lock_bh(&exp->exp_bl_list_lock);
1852 spin_lock(&obd_stale_export_lock);
1853 /* Add to the tail if there is no blocked locks,
1854 * to the head otherwise. */
1855 if (list_empty(&exp->exp_bl_list))
1856 list_add_tail(&exp->exp_stale_list,
1857 &obd_stale_exports);
1859 list_add(&exp->exp_stale_list,
1860 &obd_stale_exports);
1862 spin_unlock(&obd_stale_export_lock);
1863 spin_unlock_bh(&exp->exp_bl_list_lock);
1865 class_export_put(exp);
1869 EXPORT_SYMBOL(obd_stale_export_put);
1872 * Adjust the position of the export in the stale list,
1873 * i.e. move to the head of the list if is needed.
1875 void obd_stale_export_adjust(struct obd_export *exp)
1877 LASSERT(exp != NULL);
1878 spin_lock_bh(&exp->exp_bl_list_lock);
1879 spin_lock(&obd_stale_export_lock);
1881 if (!list_empty(&exp->exp_stale_list) &&
1882 !list_empty(&exp->exp_bl_list))
1883 list_move(&exp->exp_stale_list, &obd_stale_exports);
1885 spin_unlock(&obd_stale_export_lock);
1886 spin_unlock_bh(&exp->exp_bl_list_lock);
1888 EXPORT_SYMBOL(obd_stale_export_adjust);
1891 * start destroy zombie import/export thread
1893 int obd_zombie_impexp_init(void)
1895 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1903 * stop destroy zombie import/export thread
1905 void obd_zombie_impexp_stop(void)
1907 destroy_workqueue(zombie_wq);
1908 LASSERT(list_empty(&obd_stale_exports));
1911 /***** Kernel-userspace comm helpers *******/
1913 /* Get length of entire message, including header */
1914 int kuc_len(int payload_len)
1916 return sizeof(struct kuc_hdr) + payload_len;
1918 EXPORT_SYMBOL(kuc_len);
1920 /* Get a pointer to kuc header, given a ptr to the payload
1921 * @param p Pointer to payload area
1922 * @returns Pointer to kuc header
1924 struct kuc_hdr * kuc_ptr(void *p)
1926 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1927 LASSERT(lh->kuc_magic == KUC_MAGIC);
1930 EXPORT_SYMBOL(kuc_ptr);
1932 /* Alloc space for a message, and fill in header
1933 * @return Pointer to payload area
1935 void *kuc_alloc(int payload_len, int transport, int type)
1938 int len = kuc_len(payload_len);
1942 return ERR_PTR(-ENOMEM);
1944 lh->kuc_magic = KUC_MAGIC;
1945 lh->kuc_transport = transport;
1946 lh->kuc_msgtype = type;
1947 lh->kuc_msglen = len;
1949 return (void *)(lh + 1);
1951 EXPORT_SYMBOL(kuc_alloc);
1953 /* Takes pointer to payload area */
1954 void kuc_free(void *p, int payload_len)
1956 struct kuc_hdr *lh = kuc_ptr(p);
1957 OBD_FREE(lh, kuc_len(payload_len));
1959 EXPORT_SYMBOL(kuc_free);
1961 struct obd_request_slot_waiter {
1962 struct list_head orsw_entry;
1963 wait_queue_head_t orsw_waitq;
1967 static bool obd_request_slot_avail(struct client_obd *cli,
1968 struct obd_request_slot_waiter *orsw)
1972 spin_lock(&cli->cl_loi_list_lock);
1973 avail = !!list_empty(&orsw->orsw_entry);
1974 spin_unlock(&cli->cl_loi_list_lock);
1980 * For network flow control, the RPC sponsor needs to acquire a credit
1981 * before sending the RPC. The credits count for a connection is defined
1982 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1983 * the subsequent RPC sponsors need to wait until others released their
1984 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1986 int obd_get_request_slot(struct client_obd *cli)
1988 struct obd_request_slot_waiter orsw;
1991 spin_lock(&cli->cl_loi_list_lock);
1992 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1993 cli->cl_rpcs_in_flight++;
1994 spin_unlock(&cli->cl_loi_list_lock);
1998 init_waitqueue_head(&orsw.orsw_waitq);
1999 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2000 orsw.orsw_signaled = false;
2001 spin_unlock(&cli->cl_loi_list_lock);
2003 rc = l_wait_event_abortable(orsw.orsw_waitq,
2004 obd_request_slot_avail(cli, &orsw) ||
2005 orsw.orsw_signaled);
2007 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2008 * freed but other (such as obd_put_request_slot) is using it. */
2009 spin_lock(&cli->cl_loi_list_lock);
2011 if (!orsw.orsw_signaled) {
2012 if (list_empty(&orsw.orsw_entry))
2013 cli->cl_rpcs_in_flight--;
2015 list_del(&orsw.orsw_entry);
2020 if (orsw.orsw_signaled) {
2021 LASSERT(list_empty(&orsw.orsw_entry));
2025 spin_unlock(&cli->cl_loi_list_lock);
2029 EXPORT_SYMBOL(obd_get_request_slot);
2031 void obd_put_request_slot(struct client_obd *cli)
2033 struct obd_request_slot_waiter *orsw;
2035 spin_lock(&cli->cl_loi_list_lock);
2036 cli->cl_rpcs_in_flight--;
2038 /* If there is free slot, wakeup the first waiter. */
2039 if (!list_empty(&cli->cl_flight_waiters) &&
2040 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2041 orsw = list_first_entry(&cli->cl_flight_waiters,
2042 struct obd_request_slot_waiter,
2044 list_del_init(&orsw->orsw_entry);
2045 cli->cl_rpcs_in_flight++;
2046 wake_up(&orsw->orsw_waitq);
2048 spin_unlock(&cli->cl_loi_list_lock);
2050 EXPORT_SYMBOL(obd_put_request_slot);
2052 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2054 return cli->cl_max_rpcs_in_flight;
2056 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2058 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2060 struct obd_request_slot_waiter *orsw;
2064 const char *type_name;
2067 if (max > OBD_MAX_RIF_MAX || max < 1)
2070 type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2071 if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2072 /* adjust max_mod_rpcs_in_flight to ensure it is always
2073 * strictly lower that max_rpcs_in_flight */
2075 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2076 "because it must be higher than "
2077 "max_mod_rpcs_in_flight value",
2078 cli->cl_import->imp_obd->obd_name);
2081 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2082 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2088 spin_lock(&cli->cl_loi_list_lock);
2089 old = cli->cl_max_rpcs_in_flight;
2090 cli->cl_max_rpcs_in_flight = max;
2091 client_adjust_max_dirty(cli);
2095 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2096 for (i = 0; i < diff; i++) {
2097 if (list_empty(&cli->cl_flight_waiters))
2100 orsw = list_first_entry(&cli->cl_flight_waiters,
2101 struct obd_request_slot_waiter,
2103 list_del_init(&orsw->orsw_entry);
2104 cli->cl_rpcs_in_flight++;
2105 wake_up(&orsw->orsw_waitq);
2107 spin_unlock(&cli->cl_loi_list_lock);
2111 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2113 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2115 return cli->cl_max_mod_rpcs_in_flight;
2117 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2119 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2121 struct obd_connect_data *ocd;
2125 if (max > OBD_MAX_RIF_MAX || max < 1)
2128 /* cannot exceed or equal max_rpcs_in_flight */
2129 if (max >= cli->cl_max_rpcs_in_flight) {
2130 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2131 "higher or equal to max_rpcs_in_flight value (%u)\n",
2132 cli->cl_import->imp_obd->obd_name,
2133 max, cli->cl_max_rpcs_in_flight);
2137 /* cannot exceed max modify RPCs in flight supported by the server */
2138 ocd = &cli->cl_import->imp_connect_data;
2139 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2140 maxmodrpcs = ocd->ocd_maxmodrpcs;
2143 if (max > maxmodrpcs) {
2144 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2145 "higher than max_mod_rpcs_per_client value (%hu) "
2146 "returned by the server at connection\n",
2147 cli->cl_import->imp_obd->obd_name,
2152 spin_lock(&cli->cl_mod_rpcs_lock);
2154 prev = cli->cl_max_mod_rpcs_in_flight;
2155 cli->cl_max_mod_rpcs_in_flight = max;
2157 /* wakeup waiters if limit has been increased */
2158 if (cli->cl_max_mod_rpcs_in_flight > prev)
2159 wake_up(&cli->cl_mod_rpcs_waitq);
2161 spin_unlock(&cli->cl_mod_rpcs_lock);
2165 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2167 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2168 struct seq_file *seq)
2170 unsigned long mod_tot = 0, mod_cum;
2171 struct timespec64 now;
2174 ktime_get_real_ts64(&now);
2176 spin_lock(&cli->cl_mod_rpcs_lock);
2178 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2179 (s64)now.tv_sec, now.tv_nsec);
2180 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2181 cli->cl_mod_rpcs_in_flight);
2183 seq_printf(seq, "\n\t\t\tmodify\n");
2184 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2186 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2189 for (i = 0; i < OBD_HIST_MAX; i++) {
2190 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2192 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2193 i, mod, pct(mod, mod_tot),
2194 pct(mod_cum, mod_tot));
2195 if (mod_cum == mod_tot)
2199 spin_unlock(&cli->cl_mod_rpcs_lock);
2203 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2205 /* The number of modify RPCs sent in parallel is limited
2206 * because the server has a finite number of slots per client to
2207 * store request result and ensure reply reconstruction when needed.
2208 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2209 * that takes into account server limit and cl_max_rpcs_in_flight
2211 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2212 * one close request is allowed above the maximum.
2214 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2219 /* A slot is available if
2220 * - number of modify RPCs in flight is less than the max
2221 * - it's a close RPC and no other close request is in flight
2223 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2224 (close_req && cli->cl_close_rpcs_in_flight == 0);
2229 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2234 spin_lock(&cli->cl_mod_rpcs_lock);
2235 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2236 spin_unlock(&cli->cl_mod_rpcs_lock);
2241 /* Get a modify RPC slot from the obd client @cli according
2242 * to the kind of operation @opc that is going to be sent
2243 * and the intent @it of the operation if it applies.
2244 * If the maximum number of modify RPCs in flight is reached
2245 * the thread is put to sleep.
2246 * Returns the tag to be set in the request message. Tag 0
2247 * is reserved for non-modifying requests.
2249 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2251 bool close_req = false;
2254 if (opc == MDS_CLOSE)
2258 spin_lock(&cli->cl_mod_rpcs_lock);
2259 max = cli->cl_max_mod_rpcs_in_flight;
2260 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2261 /* there is a slot available */
2262 cli->cl_mod_rpcs_in_flight++;
2264 cli->cl_close_rpcs_in_flight++;
2265 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2266 cli->cl_mod_rpcs_in_flight);
2267 /* find a free tag */
2268 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2270 LASSERT(i < OBD_MAX_RIF_MAX);
2271 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2272 spin_unlock(&cli->cl_mod_rpcs_lock);
2273 /* tag 0 is reserved for non-modify RPCs */
2276 "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2277 cli->cl_import->imp_obd->obd_name,
2282 spin_unlock(&cli->cl_mod_rpcs_lock);
2284 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2285 "opc %u, max %hu\n",
2286 cli->cl_import->imp_obd->obd_name, opc, max);
2288 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2289 obd_mod_rpc_slot_avail(cli,
2293 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2295 /* Put a modify RPC slot from the obd client @cli according
2296 * to the kind of operation @opc that has been sent.
2298 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2300 bool close_req = false;
2305 if (opc == MDS_CLOSE)
2308 spin_lock(&cli->cl_mod_rpcs_lock);
2309 cli->cl_mod_rpcs_in_flight--;
2311 cli->cl_close_rpcs_in_flight--;
2312 /* release the tag in the bitmap */
2313 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2314 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2315 spin_unlock(&cli->cl_mod_rpcs_lock);
2316 wake_up(&cli->cl_mod_rpcs_waitq);
2318 EXPORT_SYMBOL(obd_put_mod_rpc_slot);