4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59 const char *status, int locks, int debug_level);
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90 obd, obd->obd_namespace, obd->obd_force);
93 lu_ref_fini(&obd->obd_reference);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 struct obd_type *class_search_type(const char *name)
99 struct kobject *kobj = kset_find_obj(lustre_kset, name);
101 if (kobj && kobj->ktype == &class_ktype)
102 return container_of(kobj, struct obd_type, typ_kobj);
107 EXPORT_SYMBOL(class_search_type);
109 struct obd_type *class_get_type(const char *name)
111 struct obd_type *type;
113 type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
116 const char *modname = name;
118 #ifdef HAVE_SERVER_SUPPORT
119 if (strcmp(modname, "obdfilter") == 0)
122 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123 modname = LUSTRE_OSP_NAME;
125 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126 modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 if (try_module_get(type->typ_dt_ops->o_owner)) {
140 atomic_inc(&type->typ_refcnt);
141 /* class_search_type() returned a counted reference,
142 * but we don't need that count any more as
143 * we have one through typ_refcnt.
145 kobject_put(&type->typ_kobj);
147 kobject_put(&type->typ_kobj);
154 void class_put_type(struct obd_type *type)
157 module_put(type->typ_dt_ops->o_owner);
158 atomic_dec(&type->typ_refcnt);
161 static void class_sysfs_release(struct kobject *kobj)
163 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
165 debugfs_remove_recursive(type->typ_debugfs_entry);
166 type->typ_debugfs_entry = NULL;
169 lu_device_type_fini(type->typ_lu);
171 #ifdef CONFIG_PROC_FS
172 if (type->typ_name && type->typ_procroot)
173 remove_proc_subtree(type->typ_name, proc_lustre_root);
175 OBD_FREE(type, sizeof(*type));
178 static struct kobj_type class_ktype = {
179 .sysfs_ops = &lustre_sysfs_ops,
180 .release = class_sysfs_release,
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
186 struct dentry *symlink;
187 struct obd_type *type;
190 type = class_search_type(name);
192 kobject_put(&type->typ_kobj);
193 return ERR_PTR(-EEXIST);
196 OBD_ALLOC(type, sizeof(*type));
198 return ERR_PTR(-ENOMEM);
200 type->typ_kobj.kset = lustre_kset;
201 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202 &lustre_kset->kobj, "%s", name);
206 symlink = debugfs_create_dir(name, debugfs_lustre_root);
207 type->typ_debugfs_entry = symlink;
208 type->typ_sym_filter = true;
211 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
213 if (IS_ERR(type->typ_procroot)) {
214 CERROR("%s: can't create compat proc entry: %d\n",
215 name, (int)PTR_ERR(type->typ_procroot));
216 type->typ_procroot = NULL;
222 EXPORT_SYMBOL(class_add_symlinks);
223 #endif /* HAVE_SERVER_SUPPORT */
225 #define CLASS_MAX_NAME 1024
227 int class_register_type(const struct obd_ops *dt_ops,
228 const struct md_ops *md_ops,
229 bool enable_proc, struct lprocfs_vars *vars,
230 const char *name, struct lu_device_type *ldt)
232 struct obd_type *type;
237 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
239 type = class_search_type(name);
241 #ifdef HAVE_SERVER_SUPPORT
242 if (type->typ_sym_filter)
244 #endif /* HAVE_SERVER_SUPPORT */
245 kobject_put(&type->typ_kobj);
246 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
250 OBD_ALLOC(type, sizeof(*type));
254 type->typ_kobj.kset = lustre_kset;
255 kobject_init(&type->typ_kobj, &class_ktype);
256 #ifdef HAVE_SERVER_SUPPORT
258 #endif /* HAVE_SERVER_SUPPORT */
260 type->typ_dt_ops = dt_ops;
261 type->typ_md_ops = md_ops;
263 #ifdef HAVE_SERVER_SUPPORT
264 if (type->typ_sym_filter) {
265 type->typ_sym_filter = false;
266 kobject_put(&type->typ_kobj);
270 #ifdef CONFIG_PROC_FS
271 if (enable_proc && !type->typ_procroot) {
272 type->typ_procroot = lprocfs_register(name,
275 if (IS_ERR(type->typ_procroot)) {
276 rc = PTR_ERR(type->typ_procroot);
277 type->typ_procroot = NULL;
282 type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
283 ldebugfs_add_vars(type->typ_debugfs_entry, vars, type);
285 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
288 #ifdef HAVE_SERVER_SUPPORT
293 rc = lu_device_type_init(ldt);
301 kobject_put(&type->typ_kobj);
305 EXPORT_SYMBOL(class_register_type);
307 int class_unregister_type(const char *name)
309 struct obd_type *type = class_search_type(name);
314 CERROR("unknown obd type\n");
318 if (atomic_read(&type->typ_refcnt)) {
319 CERROR("type %s has refcount (%d)\n", name,
320 atomic_read(&type->typ_refcnt));
321 /* This is a bad situation, let's make the best of it */
322 /* Remove ops, but leave the name for debugging */
323 type->typ_dt_ops = NULL;
324 type->typ_md_ops = NULL;
325 GOTO(out_put, rc = -EBUSY);
328 /* Put the final ref */
329 kobject_put(&type->typ_kobj);
331 /* Put the ref returned by class_search_type() */
332 kobject_put(&type->typ_kobj);
335 } /* class_unregister_type */
336 EXPORT_SYMBOL(class_unregister_type);
339 * Create a new obd device.
341 * Allocate the new obd_device and initialize it.
343 * \param[in] type_name obd device type string.
344 * \param[in] name obd device name.
345 * \param[in] uuid obd device UUID
347 * \retval newdev pointer to created obd_device
348 * \retval ERR_PTR(errno) on error
350 struct obd_device *class_newdev(const char *type_name, const char *name,
353 struct obd_device *newdev;
354 struct obd_type *type = NULL;
357 if (strlen(name) >= MAX_OBD_NAME) {
358 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
359 RETURN(ERR_PTR(-EINVAL));
362 type = class_get_type(type_name);
364 CERROR("OBD: unknown type: %s\n", type_name);
365 RETURN(ERR_PTR(-ENODEV));
368 newdev = obd_device_alloc();
369 if (newdev == NULL) {
370 class_put_type(type);
371 RETURN(ERR_PTR(-ENOMEM));
373 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
374 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
375 newdev->obd_type = type;
376 newdev->obd_minor = -1;
378 rwlock_init(&newdev->obd_pool_lock);
379 newdev->obd_pool_limit = 0;
380 newdev->obd_pool_slv = 0;
382 INIT_LIST_HEAD(&newdev->obd_exports);
383 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
384 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
385 INIT_LIST_HEAD(&newdev->obd_exports_timed);
386 INIT_LIST_HEAD(&newdev->obd_nid_stats);
387 spin_lock_init(&newdev->obd_nid_lock);
388 spin_lock_init(&newdev->obd_dev_lock);
389 mutex_init(&newdev->obd_dev_mutex);
390 spin_lock_init(&newdev->obd_osfs_lock);
391 /* newdev->obd_osfs_age must be set to a value in the distant
392 * past to guarantee a fresh statfs is fetched on mount. */
393 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
395 /* XXX belongs in setup not attach */
396 init_rwsem(&newdev->obd_observer_link_sem);
398 spin_lock_init(&newdev->obd_recovery_task_lock);
399 init_waitqueue_head(&newdev->obd_next_transno_waitq);
400 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
401 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
402 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
403 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
404 INIT_LIST_HEAD(&newdev->obd_evict_list);
405 INIT_LIST_HEAD(&newdev->obd_lwp_list);
407 llog_group_init(&newdev->obd_olg);
408 /* Detach drops this */
409 atomic_set(&newdev->obd_refcount, 1);
410 lu_ref_init(&newdev->obd_reference);
411 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
413 newdev->obd_conn_inprogress = 0;
415 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
417 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
418 newdev->obd_name, newdev);
426 * \param[in] obd obd_device to be freed
430 void class_free_dev(struct obd_device *obd)
432 struct obd_type *obd_type = obd->obd_type;
434 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
435 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
436 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
437 "obd %p != obd_devs[%d] %p\n",
438 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
439 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
440 "obd_refcount should be 0, not %d\n",
441 atomic_read(&obd->obd_refcount));
442 LASSERT(obd_type != NULL);
444 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
445 obd->obd_name, obd->obd_type->typ_name);
447 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
448 obd->obd_name, obd->obd_uuid.uuid);
449 if (obd->obd_stopping) {
452 /* If we're not stopping, we were never set up */
453 err = obd_cleanup(obd);
455 CERROR("Cleanup %s returned %d\n",
459 obd_device_free(obd);
461 class_put_type(obd_type);
465 * Unregister obd device.
467 * Free slot in obd_dev[] used by \a obd.
469 * \param[in] new_obd obd_device to be unregistered
473 void class_unregister_device(struct obd_device *obd)
475 write_lock(&obd_dev_lock);
476 if (obd->obd_minor >= 0) {
477 LASSERT(obd_devs[obd->obd_minor] == obd);
478 obd_devs[obd->obd_minor] = NULL;
481 write_unlock(&obd_dev_lock);
485 * Register obd device.
487 * Find free slot in obd_devs[], fills it with \a new_obd.
489 * \param[in] new_obd obd_device to be registered
492 * \retval -EEXIST device with this name is registered
493 * \retval -EOVERFLOW obd_devs[] is full
495 int class_register_device(struct obd_device *new_obd)
499 int new_obd_minor = 0;
500 bool minor_assign = false;
501 bool retried = false;
504 write_lock(&obd_dev_lock);
505 for (i = 0; i < class_devno_max(); i++) {
506 struct obd_device *obd = class_num2obd(i);
509 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
512 write_unlock(&obd_dev_lock);
514 /* the obd_device could be waited to be
515 * destroyed by the "obd_zombie_impexp_thread".
517 obd_zombie_barrier();
522 CERROR("%s: already exists, won't add\n",
524 /* in case we found a free slot before duplicate */
525 minor_assign = false;
529 if (!minor_assign && obd == NULL) {
536 new_obd->obd_minor = new_obd_minor;
537 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
538 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
539 obd_devs[new_obd_minor] = new_obd;
543 CERROR("%s: all %u/%u devices used, increase "
544 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
545 i, class_devno_max(), ret);
548 write_unlock(&obd_dev_lock);
553 static int class_name2dev_nolock(const char *name)
560 for (i = 0; i < class_devno_max(); i++) {
561 struct obd_device *obd = class_num2obd(i);
563 if (obd && strcmp(name, obd->obd_name) == 0) {
564 /* Make sure we finished attaching before we give
565 out any references */
566 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
567 if (obd->obd_attached) {
577 int class_name2dev(const char *name)
584 read_lock(&obd_dev_lock);
585 i = class_name2dev_nolock(name);
586 read_unlock(&obd_dev_lock);
590 EXPORT_SYMBOL(class_name2dev);
592 struct obd_device *class_name2obd(const char *name)
594 int dev = class_name2dev(name);
596 if (dev < 0 || dev > class_devno_max())
598 return class_num2obd(dev);
600 EXPORT_SYMBOL(class_name2obd);
602 int class_uuid2dev_nolock(struct obd_uuid *uuid)
606 for (i = 0; i < class_devno_max(); i++) {
607 struct obd_device *obd = class_num2obd(i);
609 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
610 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
618 int class_uuid2dev(struct obd_uuid *uuid)
622 read_lock(&obd_dev_lock);
623 i = class_uuid2dev_nolock(uuid);
624 read_unlock(&obd_dev_lock);
628 EXPORT_SYMBOL(class_uuid2dev);
630 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
632 int dev = class_uuid2dev(uuid);
635 return class_num2obd(dev);
637 EXPORT_SYMBOL(class_uuid2obd);
640 * Get obd device from ::obd_devs[]
642 * \param num [in] array index
644 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
645 * otherwise return the obd device there.
647 struct obd_device *class_num2obd(int num)
649 struct obd_device *obd = NULL;
651 if (num < class_devno_max()) {
656 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
657 "%p obd_magic %08x != %08x\n",
658 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
659 LASSERTF(obd->obd_minor == num,
660 "%p obd_minor %0d != %0d\n",
661 obd, obd->obd_minor, num);
668 * Find obd in obd_dev[] by name or uuid.
670 * Increment obd's refcount if found.
672 * \param[in] str obd name or uuid
674 * \retval NULL if not found
675 * \retval target pointer to found obd_device
677 struct obd_device *class_dev_by_str(const char *str)
679 struct obd_device *target = NULL;
680 struct obd_uuid tgtuuid;
683 obd_str2uuid(&tgtuuid, str);
685 read_lock(&obd_dev_lock);
686 rc = class_uuid2dev_nolock(&tgtuuid);
688 rc = class_name2dev_nolock(str);
691 target = class_num2obd(rc);
694 class_incref(target, "find", current);
695 read_unlock(&obd_dev_lock);
699 EXPORT_SYMBOL(class_dev_by_str);
702 * Get obd devices count. Device in any
704 * \retval obd device count
706 int get_devices_count(void)
708 int index, max_index = class_devno_max(), dev_count = 0;
710 read_lock(&obd_dev_lock);
711 for (index = 0; index <= max_index; index++) {
712 struct obd_device *obd = class_num2obd(index);
716 read_unlock(&obd_dev_lock);
720 EXPORT_SYMBOL(get_devices_count);
722 void class_obd_list(void)
727 read_lock(&obd_dev_lock);
728 for (i = 0; i < class_devno_max(); i++) {
729 struct obd_device *obd = class_num2obd(i);
733 if (obd->obd_stopping)
735 else if (obd->obd_set_up)
737 else if (obd->obd_attached)
741 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
742 i, status, obd->obd_type->typ_name,
743 obd->obd_name, obd->obd_uuid.uuid,
744 atomic_read(&obd->obd_refcount));
746 read_unlock(&obd_dev_lock);
749 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
750 * specified, then only the client with that uuid is returned,
751 * otherwise any client connected to the tgt is returned.
753 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
754 const char *type_name,
755 struct obd_uuid *grp_uuid)
759 read_lock(&obd_dev_lock);
760 for (i = 0; i < class_devno_max(); i++) {
761 struct obd_device *obd = class_num2obd(i);
765 if ((strncmp(obd->obd_type->typ_name, type_name,
766 strlen(type_name)) == 0)) {
767 if (obd_uuid_equals(tgt_uuid,
768 &obd->u.cli.cl_target_uuid) &&
769 ((grp_uuid)? obd_uuid_equals(grp_uuid,
770 &obd->obd_uuid) : 1)) {
771 read_unlock(&obd_dev_lock);
776 read_unlock(&obd_dev_lock);
780 EXPORT_SYMBOL(class_find_client_obd);
782 /* Iterate the obd_device list looking devices have grp_uuid. Start
783 * searching at *next, and if a device is found, the next index to look
784 * at is saved in *next. If next is NULL, then the first matching device
785 * will always be returned.
787 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
793 else if (*next >= 0 && *next < class_devno_max())
798 read_lock(&obd_dev_lock);
799 for (; i < class_devno_max(); i++) {
800 struct obd_device *obd = class_num2obd(i);
804 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
807 read_unlock(&obd_dev_lock);
811 read_unlock(&obd_dev_lock);
815 EXPORT_SYMBOL(class_devices_in_group);
818 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
819 * adjust sptlrpc settings accordingly.
821 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
823 struct obd_device *obd;
827 LASSERT(namelen > 0);
829 read_lock(&obd_dev_lock);
830 for (i = 0; i < class_devno_max(); i++) {
831 obd = class_num2obd(i);
833 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
836 /* only notify mdc, osc, osp, lwp, mdt, ost
837 * because only these have a -sptlrpc llog */
838 type = obd->obd_type->typ_name;
839 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
840 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
841 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
842 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
843 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
844 strcmp(type, LUSTRE_OST_NAME) != 0)
847 if (strncmp(obd->obd_name, fsname, namelen))
850 class_incref(obd, __FUNCTION__, obd);
851 read_unlock(&obd_dev_lock);
852 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
853 sizeof(KEY_SPTLRPC_CONF),
854 KEY_SPTLRPC_CONF, 0, NULL, NULL);
856 class_decref(obd, __FUNCTION__, obd);
857 read_lock(&obd_dev_lock);
859 read_unlock(&obd_dev_lock);
862 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
864 void obd_cleanup_caches(void)
867 if (obd_device_cachep) {
868 kmem_cache_destroy(obd_device_cachep);
869 obd_device_cachep = NULL;
875 int obd_init_caches(void)
880 LASSERT(obd_device_cachep == NULL);
881 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
882 sizeof(struct obd_device),
883 0, 0, 0, sizeof(struct obd_device), NULL);
884 if (!obd_device_cachep)
885 GOTO(out, rc = -ENOMEM);
889 obd_cleanup_caches();
893 static const char export_handle_owner[] = "export";
895 /* map connection to client */
896 struct obd_export *class_conn2export(struct lustre_handle *conn)
898 struct obd_export *export;
902 CDEBUG(D_CACHE, "looking for null handle\n");
906 if (conn->cookie == -1) { /* this means assign a new connection */
907 CDEBUG(D_CACHE, "want a new connection\n");
911 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
912 export = class_handle2object(conn->cookie, export_handle_owner);
915 EXPORT_SYMBOL(class_conn2export);
917 struct obd_device *class_exp2obd(struct obd_export *exp)
923 EXPORT_SYMBOL(class_exp2obd);
925 struct obd_import *class_exp2cliimp(struct obd_export *exp)
927 struct obd_device *obd = exp->exp_obd;
930 return obd->u.cli.cl_import;
932 EXPORT_SYMBOL(class_exp2cliimp);
934 /* Export management functions */
935 static void class_export_destroy(struct obd_export *exp)
937 struct obd_device *obd = exp->exp_obd;
940 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
941 LASSERT(obd != NULL);
943 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
944 exp->exp_client_uuid.uuid, obd->obd_name);
946 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
947 if (exp->exp_connection)
948 ptlrpc_put_connection_superhack(exp->exp_connection);
950 LASSERT(list_empty(&exp->exp_outstanding_replies));
951 LASSERT(list_empty(&exp->exp_uncommitted_replies));
952 LASSERT(list_empty(&exp->exp_req_replay_queue));
953 LASSERT(list_empty(&exp->exp_hp_rpcs));
954 obd_destroy_export(exp);
955 /* self export doesn't hold a reference to an obd, although it
956 * exists until freeing of the obd */
957 if (exp != obd->obd_self_export)
958 class_decref(obd, "export", exp);
960 OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
961 kfree_rcu(exp, exp_handle.h_rcu);
965 struct obd_export *class_export_get(struct obd_export *exp)
967 refcount_inc(&exp->exp_handle.h_ref);
968 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
969 refcount_read(&exp->exp_handle.h_ref));
972 EXPORT_SYMBOL(class_export_get);
974 void class_export_put(struct obd_export *exp)
976 LASSERT(exp != NULL);
977 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
978 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
979 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
980 refcount_read(&exp->exp_handle.h_ref) - 1);
982 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
983 struct obd_device *obd = exp->exp_obd;
985 CDEBUG(D_IOCTL, "final put %p/%s\n",
986 exp, exp->exp_client_uuid.uuid);
988 /* release nid stat refererence */
989 lprocfs_exp_cleanup(exp);
991 if (exp == obd->obd_self_export) {
992 /* self export should be destroyed without
993 * zombie thread as it doesn't hold a
994 * reference to obd and doesn't hold any
996 class_export_destroy(exp);
997 /* self export is destroyed, no class
998 * references exist and it is safe to free
1000 class_free_dev(obd);
1002 LASSERT(!list_empty(&exp->exp_obd_chain));
1003 obd_zombie_export_add(exp);
1008 EXPORT_SYMBOL(class_export_put);
1010 static void obd_zombie_exp_cull(struct work_struct *ws)
1012 struct obd_export *export;
1014 export = container_of(ws, struct obd_export, exp_zombie_work);
1015 class_export_destroy(export);
1018 /* Creates a new export, adds it to the hash table, and returns a
1019 * pointer to it. The refcount is 2: one for the hash reference, and
1020 * one for the pointer returned by this function. */
1021 struct obd_export *__class_new_export(struct obd_device *obd,
1022 struct obd_uuid *cluuid, bool is_self)
1024 struct obd_export *export;
1028 OBD_ALLOC_PTR(export);
1030 return ERR_PTR(-ENOMEM);
1032 export->exp_conn_cnt = 0;
1033 export->exp_lock_hash = NULL;
1034 export->exp_flock_hash = NULL;
1035 /* 2 = class_handle_hash + last */
1036 refcount_set(&export->exp_handle.h_ref, 2);
1037 atomic_set(&export->exp_rpc_count, 0);
1038 atomic_set(&export->exp_cb_count, 0);
1039 atomic_set(&export->exp_locks_count, 0);
1040 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1041 INIT_LIST_HEAD(&export->exp_locks_list);
1042 spin_lock_init(&export->exp_locks_list_guard);
1044 atomic_set(&export->exp_replay_count, 0);
1045 export->exp_obd = obd;
1046 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1047 spin_lock_init(&export->exp_uncommitted_replies_lock);
1048 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1049 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1050 INIT_HLIST_NODE(&export->exp_handle.h_link);
1051 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1052 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1053 class_handle_hash(&export->exp_handle, export_handle_owner);
1054 export->exp_last_request_time = ktime_get_real_seconds();
1055 spin_lock_init(&export->exp_lock);
1056 spin_lock_init(&export->exp_rpc_lock);
1057 INIT_HLIST_NODE(&export->exp_nid_hash);
1058 INIT_HLIST_NODE(&export->exp_gen_hash);
1059 spin_lock_init(&export->exp_bl_list_lock);
1060 INIT_LIST_HEAD(&export->exp_bl_list);
1061 INIT_LIST_HEAD(&export->exp_stale_list);
1062 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1064 export->exp_sp_peer = LUSTRE_SP_ANY;
1065 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1066 export->exp_client_uuid = *cluuid;
1067 obd_init_export(export);
1069 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1071 spin_lock(&obd->obd_dev_lock);
1072 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1073 /* shouldn't happen, but might race */
1074 if (obd->obd_stopping)
1075 GOTO(exit_unlock, rc = -ENODEV);
1077 rc = obd_uuid_add(obd, export);
1079 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1080 obd->obd_name, cluuid->uuid, rc);
1081 GOTO(exit_unlock, rc = -EALREADY);
1086 class_incref(obd, "export", export);
1087 list_add_tail(&export->exp_obd_chain_timed,
1088 &obd->obd_exports_timed);
1089 list_add(&export->exp_obd_chain, &obd->obd_exports);
1090 obd->obd_num_exports++;
1092 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1093 INIT_LIST_HEAD(&export->exp_obd_chain);
1095 spin_unlock(&obd->obd_dev_lock);
1099 spin_unlock(&obd->obd_dev_lock);
1100 class_handle_unhash(&export->exp_handle);
1101 obd_destroy_export(export);
1102 OBD_FREE_PTR(export);
1106 struct obd_export *class_new_export(struct obd_device *obd,
1107 struct obd_uuid *uuid)
1109 return __class_new_export(obd, uuid, false);
1111 EXPORT_SYMBOL(class_new_export);
1113 struct obd_export *class_new_export_self(struct obd_device *obd,
1114 struct obd_uuid *uuid)
1116 return __class_new_export(obd, uuid, true);
1119 void class_unlink_export(struct obd_export *exp)
1121 class_handle_unhash(&exp->exp_handle);
1123 if (exp->exp_obd->obd_self_export == exp) {
1124 class_export_put(exp);
1128 spin_lock(&exp->exp_obd->obd_dev_lock);
1129 /* delete an uuid-export hashitem from hashtables */
1130 if (exp != exp->exp_obd->obd_self_export)
1131 obd_uuid_del(exp->exp_obd, exp);
1133 #ifdef HAVE_SERVER_SUPPORT
1134 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1135 struct tg_export_data *ted = &exp->exp_target_data;
1136 struct cfs_hash *hash;
1138 /* Because obd_gen_hash will not be released until
1139 * class_cleanup(), so hash should never be NULL here */
1140 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1141 LASSERT(hash != NULL);
1142 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1143 &exp->exp_gen_hash);
1144 cfs_hash_putref(hash);
1146 #endif /* HAVE_SERVER_SUPPORT */
1148 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1149 list_del_init(&exp->exp_obd_chain_timed);
1150 exp->exp_obd->obd_num_exports--;
1151 spin_unlock(&exp->exp_obd->obd_dev_lock);
1152 atomic_inc(&obd_stale_export_num);
1154 /* A reference is kept by obd_stale_exports list */
1155 obd_stale_export_put(exp);
1157 EXPORT_SYMBOL(class_unlink_export);
1159 /* Import management functions */
1160 static void obd_zombie_import_free(struct obd_import *imp)
1164 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1165 imp->imp_obd->obd_name);
1167 LASSERT(refcount_read(&imp->imp_refcount) == 0);
1169 ptlrpc_put_connection_superhack(imp->imp_connection);
1171 while (!list_empty(&imp->imp_conn_list)) {
1172 struct obd_import_conn *imp_conn;
1174 imp_conn = list_first_entry(&imp->imp_conn_list,
1175 struct obd_import_conn, oic_item);
1176 list_del_init(&imp_conn->oic_item);
1177 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1178 OBD_FREE(imp_conn, sizeof(*imp_conn));
1181 LASSERT(imp->imp_sec == NULL);
1182 class_decref(imp->imp_obd, "import", imp);
1187 struct obd_import *class_import_get(struct obd_import *import)
1189 refcount_inc(&import->imp_refcount);
1190 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1191 refcount_read(&import->imp_refcount),
1192 import->imp_obd->obd_name);
1195 EXPORT_SYMBOL(class_import_get);
1197 void class_import_put(struct obd_import *imp)
1201 LASSERT(refcount_read(&imp->imp_refcount) > 0);
1203 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1204 refcount_read(&imp->imp_refcount) - 1,
1205 imp->imp_obd->obd_name);
1207 if (refcount_dec_and_test(&imp->imp_refcount)) {
1208 CDEBUG(D_INFO, "final put import %p\n", imp);
1209 obd_zombie_import_add(imp);
1214 EXPORT_SYMBOL(class_import_put);
1216 static void init_imp_at(struct imp_at *at) {
1218 at_init(&at->iat_net_latency, 0, 0);
1219 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1220 /* max service estimates are tracked on the server side, so
1221 don't use the AT history here, just use the last reported
1222 val. (But keep hist for proc histogram, worst_ever) */
1223 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1228 static void obd_zombie_imp_cull(struct work_struct *ws)
1230 struct obd_import *import;
1232 import = container_of(ws, struct obd_import, imp_zombie_work);
1233 obd_zombie_import_free(import);
1236 struct obd_import *class_new_import(struct obd_device *obd)
1238 struct obd_import *imp;
1239 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1241 OBD_ALLOC(imp, sizeof(*imp));
1245 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1246 INIT_LIST_HEAD(&imp->imp_replay_list);
1247 INIT_LIST_HEAD(&imp->imp_sending_list);
1248 INIT_LIST_HEAD(&imp->imp_delayed_list);
1249 INIT_LIST_HEAD(&imp->imp_committed_list);
1250 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1251 imp->imp_known_replied_xid = 0;
1252 imp->imp_replay_cursor = &imp->imp_committed_list;
1253 spin_lock_init(&imp->imp_lock);
1254 imp->imp_last_success_conn = 0;
1255 imp->imp_state = LUSTRE_IMP_NEW;
1256 imp->imp_obd = class_incref(obd, "import", imp);
1257 rwlock_init(&imp->imp_sec_lock);
1258 init_waitqueue_head(&imp->imp_recovery_waitq);
1259 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1261 if (curr_pid_ns && curr_pid_ns->child_reaper)
1262 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1264 imp->imp_sec_refpid = 1;
1266 refcount_set(&imp->imp_refcount, 2);
1267 atomic_set(&imp->imp_unregistering, 0);
1268 atomic_set(&imp->imp_inflight, 0);
1269 atomic_set(&imp->imp_replay_inflight, 0);
1270 atomic_set(&imp->imp_inval_count, 0);
1271 INIT_LIST_HEAD(&imp->imp_conn_list);
1272 init_imp_at(&imp->imp_at);
1274 /* the default magic is V2, will be used in connect RPC, and
1275 * then adjusted according to the flags in request/reply. */
1276 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1280 EXPORT_SYMBOL(class_new_import);
1282 void class_destroy_import(struct obd_import *import)
1284 LASSERT(import != NULL);
1285 LASSERT(import != LP_POISON);
1287 spin_lock(&import->imp_lock);
1288 import->imp_generation++;
1289 spin_unlock(&import->imp_lock);
1290 class_import_put(import);
1292 EXPORT_SYMBOL(class_destroy_import);
1294 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1296 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1298 spin_lock(&exp->exp_locks_list_guard);
1300 LASSERT(lock->l_exp_refs_nr >= 0);
1302 if (lock->l_exp_refs_target != NULL &&
1303 lock->l_exp_refs_target != exp) {
1304 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1305 exp, lock, lock->l_exp_refs_target);
1307 if ((lock->l_exp_refs_nr ++) == 0) {
1308 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1309 lock->l_exp_refs_target = exp;
1311 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1312 lock, exp, lock->l_exp_refs_nr);
1313 spin_unlock(&exp->exp_locks_list_guard);
1315 EXPORT_SYMBOL(__class_export_add_lock_ref);
1317 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1319 spin_lock(&exp->exp_locks_list_guard);
1320 LASSERT(lock->l_exp_refs_nr > 0);
1321 if (lock->l_exp_refs_target != exp) {
1322 LCONSOLE_WARN("lock %p, "
1323 "mismatching export pointers: %p, %p\n",
1324 lock, lock->l_exp_refs_target, exp);
1326 if (-- lock->l_exp_refs_nr == 0) {
1327 list_del_init(&lock->l_exp_refs_link);
1328 lock->l_exp_refs_target = NULL;
1330 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1331 lock, exp, lock->l_exp_refs_nr);
1332 spin_unlock(&exp->exp_locks_list_guard);
1334 EXPORT_SYMBOL(__class_export_del_lock_ref);
1337 /* A connection defines an export context in which preallocation can
1338 be managed. This releases the export pointer reference, and returns
1339 the export handle, so the export refcount is 1 when this function
1341 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1342 struct obd_uuid *cluuid)
1344 struct obd_export *export;
1345 LASSERT(conn != NULL);
1346 LASSERT(obd != NULL);
1347 LASSERT(cluuid != NULL);
1350 export = class_new_export(obd, cluuid);
1352 RETURN(PTR_ERR(export));
1354 conn->cookie = export->exp_handle.h_cookie;
1355 class_export_put(export);
1357 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1358 cluuid->uuid, conn->cookie);
1361 EXPORT_SYMBOL(class_connect);
1363 /* if export is involved in recovery then clean up related things */
1364 static void class_export_recovery_cleanup(struct obd_export *exp)
1366 struct obd_device *obd = exp->exp_obd;
1368 spin_lock(&obd->obd_recovery_task_lock);
1369 if (obd->obd_recovering) {
1370 if (exp->exp_in_recovery) {
1371 spin_lock(&exp->exp_lock);
1372 exp->exp_in_recovery = 0;
1373 spin_unlock(&exp->exp_lock);
1374 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1375 atomic_dec(&obd->obd_connected_clients);
1378 /* if called during recovery then should update
1379 * obd_stale_clients counter,
1380 * lightweight exports are not counted */
1381 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1382 exp->exp_obd->obd_stale_clients++;
1384 spin_unlock(&obd->obd_recovery_task_lock);
1386 spin_lock(&exp->exp_lock);
1387 /** Cleanup req replay fields */
1388 if (exp->exp_req_replay_needed) {
1389 exp->exp_req_replay_needed = 0;
1391 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1392 atomic_dec(&obd->obd_req_replay_clients);
1395 /** Cleanup lock replay data */
1396 if (exp->exp_lock_replay_needed) {
1397 exp->exp_lock_replay_needed = 0;
1399 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1400 atomic_dec(&obd->obd_lock_replay_clients);
1402 spin_unlock(&exp->exp_lock);
1405 /* This function removes 1-3 references from the export:
1406 * 1 - for export pointer passed
1407 * and if disconnect really need
1408 * 2 - removing from hash
1409 * 3 - in client_unlink_export
1410 * The export pointer passed to this function can destroyed */
1411 int class_disconnect(struct obd_export *export)
1413 int already_disconnected;
1416 if (export == NULL) {
1417 CWARN("attempting to free NULL export %p\n", export);
1421 spin_lock(&export->exp_lock);
1422 already_disconnected = export->exp_disconnected;
1423 export->exp_disconnected = 1;
1424 /* We hold references of export for uuid hash
1425 * and nid_hash and export link at least. So
1426 * it is safe to call cfs_hash_del in there. */
1427 if (!hlist_unhashed(&export->exp_nid_hash))
1428 cfs_hash_del(export->exp_obd->obd_nid_hash,
1429 &export->exp_connection->c_peer.nid,
1430 &export->exp_nid_hash);
1431 spin_unlock(&export->exp_lock);
1433 /* class_cleanup(), abort_recovery(), and class_fail_export()
1434 * all end up in here, and if any of them race we shouldn't
1435 * call extra class_export_puts(). */
1436 if (already_disconnected) {
1437 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1438 GOTO(no_disconn, already_disconnected);
1441 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1442 export->exp_handle.h_cookie);
1444 class_export_recovery_cleanup(export);
1445 class_unlink_export(export);
1447 class_export_put(export);
1450 EXPORT_SYMBOL(class_disconnect);
1452 /* Return non-zero for a fully connected export */
1453 int class_connected_export(struct obd_export *exp)
1458 spin_lock(&exp->exp_lock);
1459 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1460 spin_unlock(&exp->exp_lock);
1464 EXPORT_SYMBOL(class_connected_export);
1466 static void class_disconnect_export_list(struct list_head *list,
1467 enum obd_option flags)
1470 struct obd_export *exp;
1473 /* It's possible that an export may disconnect itself, but
1474 * nothing else will be added to this list. */
1475 while (!list_empty(list)) {
1476 exp = list_first_entry(list, struct obd_export,
1478 /* need for safe call CDEBUG after obd_disconnect */
1479 class_export_get(exp);
1481 spin_lock(&exp->exp_lock);
1482 exp->exp_flags = flags;
1483 spin_unlock(&exp->exp_lock);
1485 if (obd_uuid_equals(&exp->exp_client_uuid,
1486 &exp->exp_obd->obd_uuid)) {
1488 "exp %p export uuid == obd uuid, don't discon\n",
1490 /* Need to delete this now so we don't end up pointing
1491 * to work_list later when this export is cleaned up. */
1492 list_del_init(&exp->exp_obd_chain);
1493 class_export_put(exp);
1497 class_export_get(exp);
1498 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1499 "last request at %lld\n",
1500 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1501 exp, exp->exp_last_request_time);
1502 /* release one export reference anyway */
1503 rc = obd_disconnect(exp);
1505 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1506 obd_export_nid2str(exp), exp, rc);
1507 class_export_put(exp);
1512 void class_disconnect_exports(struct obd_device *obd)
1514 LIST_HEAD(work_list);
1517 /* Move all of the exports from obd_exports to a work list, en masse. */
1518 spin_lock(&obd->obd_dev_lock);
1519 list_splice_init(&obd->obd_exports, &work_list);
1520 list_splice_init(&obd->obd_delayed_exports, &work_list);
1521 spin_unlock(&obd->obd_dev_lock);
1523 if (!list_empty(&work_list)) {
1524 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1525 "disconnecting them\n", obd->obd_minor, obd);
1526 class_disconnect_export_list(&work_list,
1527 exp_flags_from_obd(obd));
1529 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1530 obd->obd_minor, obd);
1533 EXPORT_SYMBOL(class_disconnect_exports);
1535 /* Remove exports that have not completed recovery.
1537 void class_disconnect_stale_exports(struct obd_device *obd,
1538 int (*test_export)(struct obd_export *))
1540 LIST_HEAD(work_list);
1541 struct obd_export *exp, *n;
1545 spin_lock(&obd->obd_dev_lock);
1546 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1548 /* don't count self-export as client */
1549 if (obd_uuid_equals(&exp->exp_client_uuid,
1550 &exp->exp_obd->obd_uuid))
1553 /* don't evict clients which have no slot in last_rcvd
1554 * (e.g. lightweight connection) */
1555 if (exp->exp_target_data.ted_lr_idx == -1)
1558 spin_lock(&exp->exp_lock);
1559 if (exp->exp_failed || test_export(exp)) {
1560 spin_unlock(&exp->exp_lock);
1563 exp->exp_failed = 1;
1564 spin_unlock(&exp->exp_lock);
1566 list_move(&exp->exp_obd_chain, &work_list);
1568 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1569 obd->obd_name, exp->exp_client_uuid.uuid,
1570 obd_export_nid2str(exp));
1571 print_export_data(exp, "EVICTING", 0, D_HA);
1573 spin_unlock(&obd->obd_dev_lock);
1576 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1577 obd->obd_name, evicted);
1579 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1580 OBD_OPT_ABORT_RECOV);
1583 EXPORT_SYMBOL(class_disconnect_stale_exports);
1585 void class_fail_export(struct obd_export *exp)
1587 int rc, already_failed;
1589 spin_lock(&exp->exp_lock);
1590 already_failed = exp->exp_failed;
1591 exp->exp_failed = 1;
1592 spin_unlock(&exp->exp_lock);
1594 if (already_failed) {
1595 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1596 exp, exp->exp_client_uuid.uuid);
1600 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1601 exp, exp->exp_client_uuid.uuid);
1603 if (obd_dump_on_timeout)
1604 libcfs_debug_dumplog();
1606 /* need for safe call CDEBUG after obd_disconnect */
1607 class_export_get(exp);
1609 /* Most callers into obd_disconnect are removing their own reference
1610 * (request, for example) in addition to the one from the hash table.
1611 * We don't have such a reference here, so make one. */
1612 class_export_get(exp);
1613 rc = obd_disconnect(exp);
1615 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1617 CDEBUG(D_HA, "disconnected export %p/%s\n",
1618 exp, exp->exp_client_uuid.uuid);
1619 class_export_put(exp);
1621 EXPORT_SYMBOL(class_fail_export);
1623 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1625 struct cfs_hash *nid_hash;
1626 struct obd_export *doomed_exp = NULL;
1627 int exports_evicted = 0;
1629 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1631 spin_lock(&obd->obd_dev_lock);
1632 /* umount has run already, so evict thread should leave
1633 * its task to umount thread now */
1634 if (obd->obd_stopping) {
1635 spin_unlock(&obd->obd_dev_lock);
1636 return exports_evicted;
1638 nid_hash = obd->obd_nid_hash;
1639 cfs_hash_getref(nid_hash);
1640 spin_unlock(&obd->obd_dev_lock);
1643 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1644 if (doomed_exp == NULL)
1647 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1648 "nid %s found, wanted nid %s, requested nid %s\n",
1649 obd_export_nid2str(doomed_exp),
1650 libcfs_nid2str(nid_key), nid);
1651 LASSERTF(doomed_exp != obd->obd_self_export,
1652 "self-export is hashed by NID?\n");
1654 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1655 "request\n", obd->obd_name,
1656 obd_uuid2str(&doomed_exp->exp_client_uuid),
1657 obd_export_nid2str(doomed_exp));
1658 class_fail_export(doomed_exp);
1659 class_export_put(doomed_exp);
1662 cfs_hash_putref(nid_hash);
1664 if (!exports_evicted)
1665 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1666 obd->obd_name, nid);
1667 return exports_evicted;
1669 EXPORT_SYMBOL(obd_export_evict_by_nid);
1671 #ifdef HAVE_SERVER_SUPPORT
1672 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1674 struct obd_export *doomed_exp = NULL;
1675 struct obd_uuid doomed_uuid;
1676 int exports_evicted = 0;
1678 spin_lock(&obd->obd_dev_lock);
1679 if (obd->obd_stopping) {
1680 spin_unlock(&obd->obd_dev_lock);
1681 return exports_evicted;
1683 spin_unlock(&obd->obd_dev_lock);
1685 obd_str2uuid(&doomed_uuid, uuid);
1686 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1687 CERROR("%s: can't evict myself\n", obd->obd_name);
1688 return exports_evicted;
1691 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1692 if (doomed_exp == NULL) {
1693 CERROR("%s: can't disconnect %s: no exports found\n",
1694 obd->obd_name, uuid);
1696 CWARN("%s: evicting %s at adminstrative request\n",
1697 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1698 class_fail_export(doomed_exp);
1699 class_export_put(doomed_exp);
1700 obd_uuid_del(obd, doomed_exp);
1704 return exports_evicted;
1706 #endif /* HAVE_SERVER_SUPPORT */
1708 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1709 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1710 EXPORT_SYMBOL(class_export_dump_hook);
1713 static void print_export_data(struct obd_export *exp, const char *status,
1714 int locks, int debug_level)
1716 struct ptlrpc_reply_state *rs;
1717 struct ptlrpc_reply_state *first_reply = NULL;
1720 spin_lock(&exp->exp_lock);
1721 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1727 spin_unlock(&exp->exp_lock);
1729 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1730 "%p %s %llu stale:%d\n",
1731 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1732 obd_export_nid2str(exp),
1733 refcount_read(&exp->exp_handle.h_ref),
1734 atomic_read(&exp->exp_rpc_count),
1735 atomic_read(&exp->exp_cb_count),
1736 atomic_read(&exp->exp_locks_count),
1737 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1738 nreplies, first_reply, nreplies > 3 ? "..." : "",
1739 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1740 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1741 if (locks && class_export_dump_hook != NULL)
1742 class_export_dump_hook(exp);
1746 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1748 struct obd_export *exp;
1750 spin_lock(&obd->obd_dev_lock);
1751 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1752 print_export_data(exp, "ACTIVE", locks, debug_level);
1753 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1754 print_export_data(exp, "UNLINKED", locks, debug_level);
1755 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1756 print_export_data(exp, "DELAYED", locks, debug_level);
1757 spin_unlock(&obd->obd_dev_lock);
1760 void obd_exports_barrier(struct obd_device *obd)
1763 LASSERT(list_empty(&obd->obd_exports));
1764 spin_lock(&obd->obd_dev_lock);
1765 while (!list_empty(&obd->obd_unlinked_exports)) {
1766 spin_unlock(&obd->obd_dev_lock);
1767 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1768 if (waited > 5 && is_power_of_2(waited)) {
1769 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1770 "more than %d seconds. "
1771 "The obd refcount = %d. Is it stuck?\n",
1772 obd->obd_name, waited,
1773 atomic_read(&obd->obd_refcount));
1774 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1777 spin_lock(&obd->obd_dev_lock);
1779 spin_unlock(&obd->obd_dev_lock);
1781 EXPORT_SYMBOL(obd_exports_barrier);
1784 * Add export to the obd_zombe thread and notify it.
1786 static void obd_zombie_export_add(struct obd_export *exp) {
1787 atomic_dec(&obd_stale_export_num);
1788 spin_lock(&exp->exp_obd->obd_dev_lock);
1789 LASSERT(!list_empty(&exp->exp_obd_chain));
1790 list_del_init(&exp->exp_obd_chain);
1791 spin_unlock(&exp->exp_obd->obd_dev_lock);
1793 queue_work(zombie_wq, &exp->exp_zombie_work);
1797 * Add import to the obd_zombe thread and notify it.
1799 static void obd_zombie_import_add(struct obd_import *imp) {
1800 LASSERT(imp->imp_sec == NULL);
1802 queue_work(zombie_wq, &imp->imp_zombie_work);
1806 * wait when obd_zombie import/export queues become empty
1808 void obd_zombie_barrier(void)
1810 flush_workqueue(zombie_wq);
1812 EXPORT_SYMBOL(obd_zombie_barrier);
1815 struct obd_export *obd_stale_export_get(void)
1817 struct obd_export *exp = NULL;
1820 spin_lock(&obd_stale_export_lock);
1821 if (!list_empty(&obd_stale_exports)) {
1822 exp = list_first_entry(&obd_stale_exports,
1823 struct obd_export, exp_stale_list);
1824 list_del_init(&exp->exp_stale_list);
1826 spin_unlock(&obd_stale_export_lock);
1829 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1830 atomic_read(&obd_stale_export_num));
1834 EXPORT_SYMBOL(obd_stale_export_get);
1836 void obd_stale_export_put(struct obd_export *exp)
1840 LASSERT(list_empty(&exp->exp_stale_list));
1841 if (exp->exp_lock_hash &&
1842 atomic_read(&exp->exp_lock_hash->hs_count)) {
1843 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1844 atomic_read(&obd_stale_export_num));
1846 spin_lock_bh(&exp->exp_bl_list_lock);
1847 spin_lock(&obd_stale_export_lock);
1848 /* Add to the tail if there is no blocked locks,
1849 * to the head otherwise. */
1850 if (list_empty(&exp->exp_bl_list))
1851 list_add_tail(&exp->exp_stale_list,
1852 &obd_stale_exports);
1854 list_add(&exp->exp_stale_list,
1855 &obd_stale_exports);
1857 spin_unlock(&obd_stale_export_lock);
1858 spin_unlock_bh(&exp->exp_bl_list_lock);
1860 class_export_put(exp);
1864 EXPORT_SYMBOL(obd_stale_export_put);
1867 * Adjust the position of the export in the stale list,
1868 * i.e. move to the head of the list if is needed.
1870 void obd_stale_export_adjust(struct obd_export *exp)
1872 LASSERT(exp != NULL);
1873 spin_lock_bh(&exp->exp_bl_list_lock);
1874 spin_lock(&obd_stale_export_lock);
1876 if (!list_empty(&exp->exp_stale_list) &&
1877 !list_empty(&exp->exp_bl_list))
1878 list_move(&exp->exp_stale_list, &obd_stale_exports);
1880 spin_unlock(&obd_stale_export_lock);
1881 spin_unlock_bh(&exp->exp_bl_list_lock);
1883 EXPORT_SYMBOL(obd_stale_export_adjust);
1886 * start destroy zombie import/export thread
1888 int obd_zombie_impexp_init(void)
1890 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1898 * stop destroy zombie import/export thread
1900 void obd_zombie_impexp_stop(void)
1902 destroy_workqueue(zombie_wq);
1903 LASSERT(list_empty(&obd_stale_exports));
1906 /***** Kernel-userspace comm helpers *******/
1908 /* Get length of entire message, including header */
1909 int kuc_len(int payload_len)
1911 return sizeof(struct kuc_hdr) + payload_len;
1913 EXPORT_SYMBOL(kuc_len);
1915 /* Get a pointer to kuc header, given a ptr to the payload
1916 * @param p Pointer to payload area
1917 * @returns Pointer to kuc header
1919 struct kuc_hdr * kuc_ptr(void *p)
1921 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1922 LASSERT(lh->kuc_magic == KUC_MAGIC);
1925 EXPORT_SYMBOL(kuc_ptr);
1927 /* Alloc space for a message, and fill in header
1928 * @return Pointer to payload area
1930 void *kuc_alloc(int payload_len, int transport, int type)
1933 int len = kuc_len(payload_len);
1937 return ERR_PTR(-ENOMEM);
1939 lh->kuc_magic = KUC_MAGIC;
1940 lh->kuc_transport = transport;
1941 lh->kuc_msgtype = type;
1942 lh->kuc_msglen = len;
1944 return (void *)(lh + 1);
1946 EXPORT_SYMBOL(kuc_alloc);
1948 /* Takes pointer to payload area */
1949 void kuc_free(void *p, int payload_len)
1951 struct kuc_hdr *lh = kuc_ptr(p);
1952 OBD_FREE(lh, kuc_len(payload_len));
1954 EXPORT_SYMBOL(kuc_free);
1956 struct obd_request_slot_waiter {
1957 struct list_head orsw_entry;
1958 wait_queue_head_t orsw_waitq;
1962 static bool obd_request_slot_avail(struct client_obd *cli,
1963 struct obd_request_slot_waiter *orsw)
1967 spin_lock(&cli->cl_loi_list_lock);
1968 avail = !!list_empty(&orsw->orsw_entry);
1969 spin_unlock(&cli->cl_loi_list_lock);
1975 * For network flow control, the RPC sponsor needs to acquire a credit
1976 * before sending the RPC. The credits count for a connection is defined
1977 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1978 * the subsequent RPC sponsors need to wait until others released their
1979 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1981 int obd_get_request_slot(struct client_obd *cli)
1983 struct obd_request_slot_waiter orsw;
1986 spin_lock(&cli->cl_loi_list_lock);
1987 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1988 cli->cl_rpcs_in_flight++;
1989 spin_unlock(&cli->cl_loi_list_lock);
1993 init_waitqueue_head(&orsw.orsw_waitq);
1994 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
1995 orsw.orsw_signaled = false;
1996 spin_unlock(&cli->cl_loi_list_lock);
1998 rc = l_wait_event_abortable(orsw.orsw_waitq,
1999 obd_request_slot_avail(cli, &orsw) ||
2000 orsw.orsw_signaled);
2002 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2003 * freed but other (such as obd_put_request_slot) is using it. */
2004 spin_lock(&cli->cl_loi_list_lock);
2006 if (!orsw.orsw_signaled) {
2007 if (list_empty(&orsw.orsw_entry))
2008 cli->cl_rpcs_in_flight--;
2010 list_del(&orsw.orsw_entry);
2015 if (orsw.orsw_signaled) {
2016 LASSERT(list_empty(&orsw.orsw_entry));
2020 spin_unlock(&cli->cl_loi_list_lock);
2024 EXPORT_SYMBOL(obd_get_request_slot);
2026 void obd_put_request_slot(struct client_obd *cli)
2028 struct obd_request_slot_waiter *orsw;
2030 spin_lock(&cli->cl_loi_list_lock);
2031 cli->cl_rpcs_in_flight--;
2033 /* If there is free slot, wakeup the first waiter. */
2034 if (!list_empty(&cli->cl_flight_waiters) &&
2035 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2036 orsw = list_first_entry(&cli->cl_flight_waiters,
2037 struct obd_request_slot_waiter,
2039 list_del_init(&orsw->orsw_entry);
2040 cli->cl_rpcs_in_flight++;
2041 wake_up(&orsw->orsw_waitq);
2043 spin_unlock(&cli->cl_loi_list_lock);
2045 EXPORT_SYMBOL(obd_put_request_slot);
2047 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2049 return cli->cl_max_rpcs_in_flight;
2051 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2053 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2055 struct obd_request_slot_waiter *orsw;
2059 const char *type_name;
2062 if (max > OBD_MAX_RIF_MAX || max < 1)
2065 type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2066 if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2067 /* adjust max_mod_rpcs_in_flight to ensure it is always
2068 * strictly lower that max_rpcs_in_flight */
2070 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2071 "because it must be higher than "
2072 "max_mod_rpcs_in_flight value",
2073 cli->cl_import->imp_obd->obd_name);
2076 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2077 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2083 spin_lock(&cli->cl_loi_list_lock);
2084 old = cli->cl_max_rpcs_in_flight;
2085 cli->cl_max_rpcs_in_flight = max;
2086 client_adjust_max_dirty(cli);
2090 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2091 for (i = 0; i < diff; i++) {
2092 if (list_empty(&cli->cl_flight_waiters))
2095 orsw = list_first_entry(&cli->cl_flight_waiters,
2096 struct obd_request_slot_waiter,
2098 list_del_init(&orsw->orsw_entry);
2099 cli->cl_rpcs_in_flight++;
2100 wake_up(&orsw->orsw_waitq);
2102 spin_unlock(&cli->cl_loi_list_lock);
2106 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2108 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2110 return cli->cl_max_mod_rpcs_in_flight;
2112 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2114 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2116 struct obd_connect_data *ocd;
2120 if (max > OBD_MAX_RIF_MAX || max < 1)
2123 /* cannot exceed or equal max_rpcs_in_flight */
2124 if (max >= cli->cl_max_rpcs_in_flight) {
2125 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2126 "higher or equal to max_rpcs_in_flight value (%u)\n",
2127 cli->cl_import->imp_obd->obd_name,
2128 max, cli->cl_max_rpcs_in_flight);
2132 /* cannot exceed max modify RPCs in flight supported by the server */
2133 ocd = &cli->cl_import->imp_connect_data;
2134 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2135 maxmodrpcs = ocd->ocd_maxmodrpcs;
2138 if (max > maxmodrpcs) {
2139 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2140 "higher than max_mod_rpcs_per_client value (%hu) "
2141 "returned by the server at connection\n",
2142 cli->cl_import->imp_obd->obd_name,
2147 spin_lock(&cli->cl_mod_rpcs_lock);
2149 prev = cli->cl_max_mod_rpcs_in_flight;
2150 cli->cl_max_mod_rpcs_in_flight = max;
2152 /* wakeup waiters if limit has been increased */
2153 if (cli->cl_max_mod_rpcs_in_flight > prev)
2154 wake_up(&cli->cl_mod_rpcs_waitq);
2156 spin_unlock(&cli->cl_mod_rpcs_lock);
2160 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2162 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2163 struct seq_file *seq)
2165 unsigned long mod_tot = 0, mod_cum;
2166 struct timespec64 now;
2169 ktime_get_real_ts64(&now);
2171 spin_lock(&cli->cl_mod_rpcs_lock);
2173 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2174 (s64)now.tv_sec, now.tv_nsec);
2175 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2176 cli->cl_mod_rpcs_in_flight);
2178 seq_printf(seq, "\n\t\t\tmodify\n");
2179 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2181 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2184 for (i = 0; i < OBD_HIST_MAX; i++) {
2185 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2187 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2188 i, mod, pct(mod, mod_tot),
2189 pct(mod_cum, mod_tot));
2190 if (mod_cum == mod_tot)
2194 spin_unlock(&cli->cl_mod_rpcs_lock);
2198 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2200 /* The number of modify RPCs sent in parallel is limited
2201 * because the server has a finite number of slots per client to
2202 * store request result and ensure reply reconstruction when needed.
2203 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2204 * that takes into account server limit and cl_max_rpcs_in_flight
2206 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2207 * one close request is allowed above the maximum.
2209 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2214 /* A slot is available if
2215 * - number of modify RPCs in flight is less than the max
2216 * - it's a close RPC and no other close request is in flight
2218 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2219 (close_req && cli->cl_close_rpcs_in_flight == 0);
2224 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2229 spin_lock(&cli->cl_mod_rpcs_lock);
2230 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2231 spin_unlock(&cli->cl_mod_rpcs_lock);
2236 /* Get a modify RPC slot from the obd client @cli according
2237 * to the kind of operation @opc that is going to be sent
2238 * and the intent @it of the operation if it applies.
2239 * If the maximum number of modify RPCs in flight is reached
2240 * the thread is put to sleep.
2241 * Returns the tag to be set in the request message. Tag 0
2242 * is reserved for non-modifying requests.
2244 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2246 bool close_req = false;
2249 if (opc == MDS_CLOSE)
2253 spin_lock(&cli->cl_mod_rpcs_lock);
2254 max = cli->cl_max_mod_rpcs_in_flight;
2255 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2256 /* there is a slot available */
2257 cli->cl_mod_rpcs_in_flight++;
2259 cli->cl_close_rpcs_in_flight++;
2260 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2261 cli->cl_mod_rpcs_in_flight);
2262 /* find a free tag */
2263 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2265 LASSERT(i < OBD_MAX_RIF_MAX);
2266 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2267 spin_unlock(&cli->cl_mod_rpcs_lock);
2268 /* tag 0 is reserved for non-modify RPCs */
2271 "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2272 cli->cl_import->imp_obd->obd_name,
2277 spin_unlock(&cli->cl_mod_rpcs_lock);
2279 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2280 "opc %u, max %hu\n",
2281 cli->cl_import->imp_obd->obd_name, opc, max);
2283 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2284 obd_mod_rpc_slot_avail(cli,
2288 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2290 /* Put a modify RPC slot from the obd client @cli according
2291 * to the kind of operation @opc that has been sent.
2293 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2295 bool close_req = false;
2300 if (opc == MDS_CLOSE)
2303 spin_lock(&cli->cl_mod_rpcs_lock);
2304 cli->cl_mod_rpcs_in_flight--;
2306 cli->cl_close_rpcs_in_flight--;
2307 /* release the tag in the bitmap */
2308 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2309 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2310 spin_unlock(&cli->cl_mod_rpcs_lock);
2311 wake_up(&cli->cl_mod_rpcs_waitq);
2313 EXPORT_SYMBOL(obd_put_mod_rpc_slot);