4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59 const char *status, int locks, int debug_level);
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90 obd, obd->obd_namespace, obd->obd_force);
93 lu_ref_fini(&obd->obd_reference);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 struct obd_type *class_search_type(const char *name)
99 struct kobject *kobj = kset_find_obj(lustre_kset, name);
101 if (kobj && kobj->ktype == &class_ktype)
102 return container_of(kobj, struct obd_type, typ_kobj);
107 EXPORT_SYMBOL(class_search_type);
109 struct obd_type *class_get_type(const char *name)
111 struct obd_type *type;
113 type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
116 const char *modname = name;
118 #ifdef HAVE_SERVER_SUPPORT
119 if (strcmp(modname, "obdfilter") == 0)
122 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123 modname = LUSTRE_OSP_NAME;
125 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126 modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 if (try_module_get(type->typ_dt_ops->o_owner)) {
140 atomic_inc(&type->typ_refcnt);
141 /* class_search_type() returned a counted reference,
142 * but we don't need that count any more as
143 * we have one through typ_refcnt.
145 kobject_put(&type->typ_kobj);
147 kobject_put(&type->typ_kobj);
154 void class_put_type(struct obd_type *type)
157 module_put(type->typ_dt_ops->o_owner);
158 atomic_dec(&type->typ_refcnt);
161 static void class_sysfs_release(struct kobject *kobj)
163 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
165 debugfs_remove_recursive(type->typ_debugfs_entry);
166 type->typ_debugfs_entry = NULL;
169 lu_device_type_fini(type->typ_lu);
171 #ifdef CONFIG_PROC_FS
172 if (type->typ_name && type->typ_procroot)
173 remove_proc_subtree(type->typ_name, proc_lustre_root);
175 OBD_FREE(type, sizeof(*type));
178 static struct kobj_type class_ktype = {
179 .sysfs_ops = &lustre_sysfs_ops,
180 .release = class_sysfs_release,
183 #ifdef HAVE_SERVER_SUPPORT
184 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
186 struct dentry *symlink;
187 struct obd_type *type;
190 type = class_search_type(name);
192 kobject_put(&type->typ_kobj);
193 return ERR_PTR(-EEXIST);
196 OBD_ALLOC(type, sizeof(*type));
198 return ERR_PTR(-ENOMEM);
200 type->typ_kobj.kset = lustre_kset;
201 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
202 &lustre_kset->kobj, "%s", name);
206 symlink = debugfs_create_dir(name, debugfs_lustre_root);
207 type->typ_debugfs_entry = symlink;
208 type->typ_sym_filter = true;
211 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
213 if (IS_ERR(type->typ_procroot)) {
214 CERROR("%s: can't create compat proc entry: %d\n",
215 name, (int)PTR_ERR(type->typ_procroot));
216 type->typ_procroot = NULL;
222 EXPORT_SYMBOL(class_add_symlinks);
223 #endif /* HAVE_SERVER_SUPPORT */
225 #define CLASS_MAX_NAME 1024
227 int class_register_type(const struct obd_ops *dt_ops,
228 const struct md_ops *md_ops,
229 bool enable_proc, struct lprocfs_vars *vars,
230 const char *name, struct lu_device_type *ldt)
232 struct obd_type *type;
237 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
239 type = class_search_type(name);
241 #ifdef HAVE_SERVER_SUPPORT
242 if (type->typ_sym_filter)
244 #endif /* HAVE_SERVER_SUPPORT */
245 kobject_put(&type->typ_kobj);
246 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
250 OBD_ALLOC(type, sizeof(*type));
254 type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
255 type->typ_kobj.kset = lustre_kset;
256 kobject_init(&type->typ_kobj, &class_ktype);
257 #ifdef HAVE_SERVER_SUPPORT
259 #endif /* HAVE_SERVER_SUPPORT */
261 type->typ_dt_ops = dt_ops;
262 type->typ_md_ops = md_ops;
264 #ifdef HAVE_SERVER_SUPPORT
265 if (type->typ_sym_filter) {
266 type->typ_sym_filter = false;
267 kobject_put(&type->typ_kobj);
271 #ifdef CONFIG_PROC_FS
272 if (enable_proc && !type->typ_procroot) {
273 type->typ_procroot = lprocfs_register(name,
276 if (IS_ERR(type->typ_procroot)) {
277 rc = PTR_ERR(type->typ_procroot);
278 type->typ_procroot = NULL;
283 type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
284 ldebugfs_add_vars(type->typ_debugfs_entry, vars, type);
286 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
289 #ifdef HAVE_SERVER_SUPPORT
293 rc = lu_device_type_init(ldt);
294 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
295 wake_up_var(&type->typ_lu);
303 kobject_put(&type->typ_kobj);
307 EXPORT_SYMBOL(class_register_type);
309 int class_unregister_type(const char *name)
311 struct obd_type *type = class_search_type(name);
316 CERROR("unknown obd type\n");
320 if (atomic_read(&type->typ_refcnt)) {
321 CERROR("type %s has refcount (%d)\n", name,
322 atomic_read(&type->typ_refcnt));
323 /* This is a bad situation, let's make the best of it */
324 /* Remove ops, but leave the name for debugging */
325 type->typ_dt_ops = NULL;
326 type->typ_md_ops = NULL;
327 GOTO(out_put, rc = -EBUSY);
330 /* Put the final ref */
331 kobject_put(&type->typ_kobj);
333 /* Put the ref returned by class_search_type() */
334 kobject_put(&type->typ_kobj);
337 } /* class_unregister_type */
338 EXPORT_SYMBOL(class_unregister_type);
341 * Create a new obd device.
343 * Allocate the new obd_device and initialize it.
345 * \param[in] type_name obd device type string.
346 * \param[in] name obd device name.
347 * \param[in] uuid obd device UUID
349 * \retval newdev pointer to created obd_device
350 * \retval ERR_PTR(errno) on error
352 struct obd_device *class_newdev(const char *type_name, const char *name,
355 struct obd_device *newdev;
356 struct obd_type *type = NULL;
359 if (strlen(name) >= MAX_OBD_NAME) {
360 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
361 RETURN(ERR_PTR(-EINVAL));
364 type = class_get_type(type_name);
366 CERROR("OBD: unknown type: %s\n", type_name);
367 RETURN(ERR_PTR(-ENODEV));
370 newdev = obd_device_alloc();
371 if (newdev == NULL) {
372 class_put_type(type);
373 RETURN(ERR_PTR(-ENOMEM));
375 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
376 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
377 newdev->obd_type = type;
378 newdev->obd_minor = -1;
380 rwlock_init(&newdev->obd_pool_lock);
381 newdev->obd_pool_limit = 0;
382 newdev->obd_pool_slv = 0;
384 INIT_LIST_HEAD(&newdev->obd_exports);
385 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
386 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
387 INIT_LIST_HEAD(&newdev->obd_exports_timed);
388 INIT_LIST_HEAD(&newdev->obd_nid_stats);
389 spin_lock_init(&newdev->obd_nid_lock);
390 spin_lock_init(&newdev->obd_dev_lock);
391 mutex_init(&newdev->obd_dev_mutex);
392 spin_lock_init(&newdev->obd_osfs_lock);
393 /* newdev->obd_osfs_age must be set to a value in the distant
394 * past to guarantee a fresh statfs is fetched on mount. */
395 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
397 /* XXX belongs in setup not attach */
398 init_rwsem(&newdev->obd_observer_link_sem);
400 spin_lock_init(&newdev->obd_recovery_task_lock);
401 init_waitqueue_head(&newdev->obd_next_transno_waitq);
402 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
403 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
404 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
405 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
406 INIT_LIST_HEAD(&newdev->obd_evict_list);
407 INIT_LIST_HEAD(&newdev->obd_lwp_list);
409 llog_group_init(&newdev->obd_olg);
410 /* Detach drops this */
411 atomic_set(&newdev->obd_refcount, 1);
412 lu_ref_init(&newdev->obd_reference);
413 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
415 newdev->obd_conn_inprogress = 0;
417 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
419 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
420 newdev->obd_name, newdev);
428 * \param[in] obd obd_device to be freed
432 void class_free_dev(struct obd_device *obd)
434 struct obd_type *obd_type = obd->obd_type;
436 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
437 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
438 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
439 "obd %p != obd_devs[%d] %p\n",
440 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
441 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
442 "obd_refcount should be 0, not %d\n",
443 atomic_read(&obd->obd_refcount));
444 LASSERT(obd_type != NULL);
446 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
447 obd->obd_name, obd->obd_type->typ_name);
449 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
450 obd->obd_name, obd->obd_uuid.uuid);
451 if (obd->obd_stopping) {
454 /* If we're not stopping, we were never set up */
455 err = obd_cleanup(obd);
457 CERROR("Cleanup %s returned %d\n",
461 obd_device_free(obd);
463 class_put_type(obd_type);
467 * Unregister obd device.
469 * Free slot in obd_dev[] used by \a obd.
471 * \param[in] new_obd obd_device to be unregistered
475 void class_unregister_device(struct obd_device *obd)
477 write_lock(&obd_dev_lock);
478 if (obd->obd_minor >= 0) {
479 LASSERT(obd_devs[obd->obd_minor] == obd);
480 obd_devs[obd->obd_minor] = NULL;
483 write_unlock(&obd_dev_lock);
487 * Register obd device.
489 * Find free slot in obd_devs[], fills it with \a new_obd.
491 * \param[in] new_obd obd_device to be registered
494 * \retval -EEXIST device with this name is registered
495 * \retval -EOVERFLOW obd_devs[] is full
497 int class_register_device(struct obd_device *new_obd)
501 int new_obd_minor = 0;
502 bool minor_assign = false;
503 bool retried = false;
506 write_lock(&obd_dev_lock);
507 for (i = 0; i < class_devno_max(); i++) {
508 struct obd_device *obd = class_num2obd(i);
511 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
514 write_unlock(&obd_dev_lock);
516 /* the obd_device could be waited to be
517 * destroyed by the "obd_zombie_impexp_thread".
519 obd_zombie_barrier();
524 CERROR("%s: already exists, won't add\n",
526 /* in case we found a free slot before duplicate */
527 minor_assign = false;
531 if (!minor_assign && obd == NULL) {
538 new_obd->obd_minor = new_obd_minor;
539 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
540 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
541 obd_devs[new_obd_minor] = new_obd;
545 CERROR("%s: all %u/%u devices used, increase "
546 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
547 i, class_devno_max(), ret);
550 write_unlock(&obd_dev_lock);
555 static int class_name2dev_nolock(const char *name)
562 for (i = 0; i < class_devno_max(); i++) {
563 struct obd_device *obd = class_num2obd(i);
565 if (obd && strcmp(name, obd->obd_name) == 0) {
566 /* Make sure we finished attaching before we give
567 out any references */
568 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
569 if (obd->obd_attached) {
579 int class_name2dev(const char *name)
586 read_lock(&obd_dev_lock);
587 i = class_name2dev_nolock(name);
588 read_unlock(&obd_dev_lock);
592 EXPORT_SYMBOL(class_name2dev);
594 struct obd_device *class_name2obd(const char *name)
596 int dev = class_name2dev(name);
598 if (dev < 0 || dev > class_devno_max())
600 return class_num2obd(dev);
602 EXPORT_SYMBOL(class_name2obd);
604 int class_uuid2dev_nolock(struct obd_uuid *uuid)
608 for (i = 0; i < class_devno_max(); i++) {
609 struct obd_device *obd = class_num2obd(i);
611 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
612 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
620 int class_uuid2dev(struct obd_uuid *uuid)
624 read_lock(&obd_dev_lock);
625 i = class_uuid2dev_nolock(uuid);
626 read_unlock(&obd_dev_lock);
630 EXPORT_SYMBOL(class_uuid2dev);
632 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
634 int dev = class_uuid2dev(uuid);
637 return class_num2obd(dev);
639 EXPORT_SYMBOL(class_uuid2obd);
642 * Get obd device from ::obd_devs[]
644 * \param num [in] array index
646 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
647 * otherwise return the obd device there.
649 struct obd_device *class_num2obd(int num)
651 struct obd_device *obd = NULL;
653 if (num < class_devno_max()) {
658 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
659 "%p obd_magic %08x != %08x\n",
660 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
661 LASSERTF(obd->obd_minor == num,
662 "%p obd_minor %0d != %0d\n",
663 obd, obd->obd_minor, num);
670 * Find obd in obd_dev[] by name or uuid.
672 * Increment obd's refcount if found.
674 * \param[in] str obd name or uuid
676 * \retval NULL if not found
677 * \retval target pointer to found obd_device
679 struct obd_device *class_dev_by_str(const char *str)
681 struct obd_device *target = NULL;
682 struct obd_uuid tgtuuid;
685 obd_str2uuid(&tgtuuid, str);
687 read_lock(&obd_dev_lock);
688 rc = class_uuid2dev_nolock(&tgtuuid);
690 rc = class_name2dev_nolock(str);
693 target = class_num2obd(rc);
696 class_incref(target, "find", current);
697 read_unlock(&obd_dev_lock);
701 EXPORT_SYMBOL(class_dev_by_str);
704 * Get obd devices count. Device in any
706 * \retval obd device count
708 int get_devices_count(void)
710 int index, max_index = class_devno_max(), dev_count = 0;
712 read_lock(&obd_dev_lock);
713 for (index = 0; index <= max_index; index++) {
714 struct obd_device *obd = class_num2obd(index);
718 read_unlock(&obd_dev_lock);
722 EXPORT_SYMBOL(get_devices_count);
724 void class_obd_list(void)
729 read_lock(&obd_dev_lock);
730 for (i = 0; i < class_devno_max(); i++) {
731 struct obd_device *obd = class_num2obd(i);
735 if (obd->obd_stopping)
737 else if (obd->obd_set_up)
739 else if (obd->obd_attached)
743 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
744 i, status, obd->obd_type->typ_name,
745 obd->obd_name, obd->obd_uuid.uuid,
746 atomic_read(&obd->obd_refcount));
748 read_unlock(&obd_dev_lock);
751 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
752 * specified, then only the client with that uuid is returned,
753 * otherwise any client connected to the tgt is returned.
755 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
756 const char *type_name,
757 struct obd_uuid *grp_uuid)
761 read_lock(&obd_dev_lock);
762 for (i = 0; i < class_devno_max(); i++) {
763 struct obd_device *obd = class_num2obd(i);
767 if ((strncmp(obd->obd_type->typ_name, type_name,
768 strlen(type_name)) == 0)) {
769 if (obd_uuid_equals(tgt_uuid,
770 &obd->u.cli.cl_target_uuid) &&
771 ((grp_uuid)? obd_uuid_equals(grp_uuid,
772 &obd->obd_uuid) : 1)) {
773 read_unlock(&obd_dev_lock);
778 read_unlock(&obd_dev_lock);
782 EXPORT_SYMBOL(class_find_client_obd);
784 /* Iterate the obd_device list looking devices have grp_uuid. Start
785 * searching at *next, and if a device is found, the next index to look
786 * at is saved in *next. If next is NULL, then the first matching device
787 * will always be returned.
789 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
795 else if (*next >= 0 && *next < class_devno_max())
800 read_lock(&obd_dev_lock);
801 for (; i < class_devno_max(); i++) {
802 struct obd_device *obd = class_num2obd(i);
806 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
809 read_unlock(&obd_dev_lock);
813 read_unlock(&obd_dev_lock);
817 EXPORT_SYMBOL(class_devices_in_group);
820 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
821 * adjust sptlrpc settings accordingly.
823 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
825 struct obd_device *obd;
829 LASSERT(namelen > 0);
831 read_lock(&obd_dev_lock);
832 for (i = 0; i < class_devno_max(); i++) {
833 obd = class_num2obd(i);
835 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
838 /* only notify mdc, osc, osp, lwp, mdt, ost
839 * because only these have a -sptlrpc llog */
840 type = obd->obd_type->typ_name;
841 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
842 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
843 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
844 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
845 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
846 strcmp(type, LUSTRE_OST_NAME) != 0)
849 if (strncmp(obd->obd_name, fsname, namelen))
852 class_incref(obd, __FUNCTION__, obd);
853 read_unlock(&obd_dev_lock);
854 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
855 sizeof(KEY_SPTLRPC_CONF),
856 KEY_SPTLRPC_CONF, 0, NULL, NULL);
858 class_decref(obd, __FUNCTION__, obd);
859 read_lock(&obd_dev_lock);
861 read_unlock(&obd_dev_lock);
864 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
866 void obd_cleanup_caches(void)
869 if (obd_device_cachep) {
870 kmem_cache_destroy(obd_device_cachep);
871 obd_device_cachep = NULL;
877 int obd_init_caches(void)
882 LASSERT(obd_device_cachep == NULL);
883 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
884 sizeof(struct obd_device),
885 0, 0, 0, sizeof(struct obd_device), NULL);
886 if (!obd_device_cachep)
887 GOTO(out, rc = -ENOMEM);
891 obd_cleanup_caches();
895 static const char export_handle_owner[] = "export";
897 /* map connection to client */
898 struct obd_export *class_conn2export(struct lustre_handle *conn)
900 struct obd_export *export;
904 CDEBUG(D_CACHE, "looking for null handle\n");
908 if (conn->cookie == -1) { /* this means assign a new connection */
909 CDEBUG(D_CACHE, "want a new connection\n");
913 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
914 export = class_handle2object(conn->cookie, export_handle_owner);
917 EXPORT_SYMBOL(class_conn2export);
919 struct obd_device *class_exp2obd(struct obd_export *exp)
925 EXPORT_SYMBOL(class_exp2obd);
927 struct obd_import *class_exp2cliimp(struct obd_export *exp)
929 struct obd_device *obd = exp->exp_obd;
932 return obd->u.cli.cl_import;
934 EXPORT_SYMBOL(class_exp2cliimp);
936 /* Export management functions */
937 static void class_export_destroy(struct obd_export *exp)
939 struct obd_device *obd = exp->exp_obd;
942 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
943 LASSERT(obd != NULL);
945 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
946 exp->exp_client_uuid.uuid, obd->obd_name);
948 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
949 if (exp->exp_connection)
950 ptlrpc_put_connection_superhack(exp->exp_connection);
952 LASSERT(list_empty(&exp->exp_outstanding_replies));
953 LASSERT(list_empty(&exp->exp_uncommitted_replies));
954 LASSERT(list_empty(&exp->exp_req_replay_queue));
955 LASSERT(list_empty(&exp->exp_hp_rpcs));
956 obd_destroy_export(exp);
957 /* self export doesn't hold a reference to an obd, although it
958 * exists until freeing of the obd */
959 if (exp != obd->obd_self_export)
960 class_decref(obd, "export", exp);
962 OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
963 kfree_rcu(exp, exp_handle.h_rcu);
967 struct obd_export *class_export_get(struct obd_export *exp)
969 refcount_inc(&exp->exp_handle.h_ref);
970 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
971 refcount_read(&exp->exp_handle.h_ref));
974 EXPORT_SYMBOL(class_export_get);
976 void class_export_put(struct obd_export *exp)
978 LASSERT(exp != NULL);
979 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
980 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
981 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
982 refcount_read(&exp->exp_handle.h_ref) - 1);
984 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
985 struct obd_device *obd = exp->exp_obd;
987 CDEBUG(D_IOCTL, "final put %p/%s\n",
988 exp, exp->exp_client_uuid.uuid);
990 /* release nid stat refererence */
991 lprocfs_exp_cleanup(exp);
993 if (exp == obd->obd_self_export) {
994 /* self export should be destroyed without
995 * zombie thread as it doesn't hold a
996 * reference to obd and doesn't hold any
998 class_export_destroy(exp);
999 /* self export is destroyed, no class
1000 * references exist and it is safe to free
1002 class_free_dev(obd);
1004 LASSERT(!list_empty(&exp->exp_obd_chain));
1005 obd_zombie_export_add(exp);
1010 EXPORT_SYMBOL(class_export_put);
1012 static void obd_zombie_exp_cull(struct work_struct *ws)
1014 struct obd_export *export;
1016 export = container_of(ws, struct obd_export, exp_zombie_work);
1017 class_export_destroy(export);
1020 /* Creates a new export, adds it to the hash table, and returns a
1021 * pointer to it. The refcount is 2: one for the hash reference, and
1022 * one for the pointer returned by this function. */
1023 struct obd_export *__class_new_export(struct obd_device *obd,
1024 struct obd_uuid *cluuid, bool is_self)
1026 struct obd_export *export;
1030 OBD_ALLOC_PTR(export);
1032 return ERR_PTR(-ENOMEM);
1034 export->exp_conn_cnt = 0;
1035 export->exp_lock_hash = NULL;
1036 export->exp_flock_hash = NULL;
1037 /* 2 = class_handle_hash + last */
1038 refcount_set(&export->exp_handle.h_ref, 2);
1039 atomic_set(&export->exp_rpc_count, 0);
1040 atomic_set(&export->exp_cb_count, 0);
1041 atomic_set(&export->exp_locks_count, 0);
1042 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1043 INIT_LIST_HEAD(&export->exp_locks_list);
1044 spin_lock_init(&export->exp_locks_list_guard);
1046 atomic_set(&export->exp_replay_count, 0);
1047 export->exp_obd = obd;
1048 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1049 spin_lock_init(&export->exp_uncommitted_replies_lock);
1050 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1051 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1052 INIT_HLIST_NODE(&export->exp_handle.h_link);
1053 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1054 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1055 class_handle_hash(&export->exp_handle, export_handle_owner);
1056 export->exp_last_request_time = ktime_get_real_seconds();
1057 spin_lock_init(&export->exp_lock);
1058 spin_lock_init(&export->exp_rpc_lock);
1059 INIT_HLIST_NODE(&export->exp_nid_hash);
1060 INIT_HLIST_NODE(&export->exp_gen_hash);
1061 spin_lock_init(&export->exp_bl_list_lock);
1062 INIT_LIST_HEAD(&export->exp_bl_list);
1063 INIT_LIST_HEAD(&export->exp_stale_list);
1064 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1066 export->exp_sp_peer = LUSTRE_SP_ANY;
1067 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1068 export->exp_client_uuid = *cluuid;
1069 obd_init_export(export);
1071 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1073 spin_lock(&obd->obd_dev_lock);
1074 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1075 /* shouldn't happen, but might race */
1076 if (obd->obd_stopping)
1077 GOTO(exit_unlock, rc = -ENODEV);
1079 rc = obd_uuid_add(obd, export);
1081 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1082 obd->obd_name, cluuid->uuid, rc);
1083 GOTO(exit_unlock, rc = -EALREADY);
1088 class_incref(obd, "export", export);
1089 list_add_tail(&export->exp_obd_chain_timed,
1090 &obd->obd_exports_timed);
1091 list_add(&export->exp_obd_chain, &obd->obd_exports);
1092 obd->obd_num_exports++;
1094 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1095 INIT_LIST_HEAD(&export->exp_obd_chain);
1097 spin_unlock(&obd->obd_dev_lock);
1101 spin_unlock(&obd->obd_dev_lock);
1102 class_handle_unhash(&export->exp_handle);
1103 obd_destroy_export(export);
1104 OBD_FREE_PTR(export);
1108 struct obd_export *class_new_export(struct obd_device *obd,
1109 struct obd_uuid *uuid)
1111 return __class_new_export(obd, uuid, false);
1113 EXPORT_SYMBOL(class_new_export);
1115 struct obd_export *class_new_export_self(struct obd_device *obd,
1116 struct obd_uuid *uuid)
1118 return __class_new_export(obd, uuid, true);
1121 void class_unlink_export(struct obd_export *exp)
1123 class_handle_unhash(&exp->exp_handle);
1125 if (exp->exp_obd->obd_self_export == exp) {
1126 class_export_put(exp);
1130 spin_lock(&exp->exp_obd->obd_dev_lock);
1131 /* delete an uuid-export hashitem from hashtables */
1132 if (exp != exp->exp_obd->obd_self_export)
1133 obd_uuid_del(exp->exp_obd, exp);
1135 #ifdef HAVE_SERVER_SUPPORT
1136 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1137 struct tg_export_data *ted = &exp->exp_target_data;
1138 struct cfs_hash *hash;
1140 /* Because obd_gen_hash will not be released until
1141 * class_cleanup(), so hash should never be NULL here */
1142 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1143 LASSERT(hash != NULL);
1144 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1145 &exp->exp_gen_hash);
1146 cfs_hash_putref(hash);
1148 #endif /* HAVE_SERVER_SUPPORT */
1150 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1151 list_del_init(&exp->exp_obd_chain_timed);
1152 exp->exp_obd->obd_num_exports--;
1153 spin_unlock(&exp->exp_obd->obd_dev_lock);
1154 atomic_inc(&obd_stale_export_num);
1156 /* A reference is kept by obd_stale_exports list */
1157 obd_stale_export_put(exp);
1159 EXPORT_SYMBOL(class_unlink_export);
1161 /* Import management functions */
1162 static void obd_zombie_import_free(struct obd_import *imp)
1166 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1167 imp->imp_obd->obd_name);
1169 LASSERT(refcount_read(&imp->imp_refcount) == 0);
1171 ptlrpc_put_connection_superhack(imp->imp_connection);
1173 while (!list_empty(&imp->imp_conn_list)) {
1174 struct obd_import_conn *imp_conn;
1176 imp_conn = list_first_entry(&imp->imp_conn_list,
1177 struct obd_import_conn, oic_item);
1178 list_del_init(&imp_conn->oic_item);
1179 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1180 OBD_FREE(imp_conn, sizeof(*imp_conn));
1183 LASSERT(imp->imp_sec == NULL);
1184 class_decref(imp->imp_obd, "import", imp);
1189 struct obd_import *class_import_get(struct obd_import *import)
1191 refcount_inc(&import->imp_refcount);
1192 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1193 refcount_read(&import->imp_refcount),
1194 import->imp_obd->obd_name);
1197 EXPORT_SYMBOL(class_import_get);
1199 void class_import_put(struct obd_import *imp)
1203 LASSERT(refcount_read(&imp->imp_refcount) > 0);
1205 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1206 refcount_read(&imp->imp_refcount) - 1,
1207 imp->imp_obd->obd_name);
1209 if (refcount_dec_and_test(&imp->imp_refcount)) {
1210 CDEBUG(D_INFO, "final put import %p\n", imp);
1211 obd_zombie_import_add(imp);
1216 EXPORT_SYMBOL(class_import_put);
1218 static void init_imp_at(struct imp_at *at) {
1220 at_init(&at->iat_net_latency, 0, 0);
1221 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1222 /* max service estimates are tracked on the server side, so
1223 don't use the AT history here, just use the last reported
1224 val. (But keep hist for proc histogram, worst_ever) */
1225 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1230 static void obd_zombie_imp_cull(struct work_struct *ws)
1232 struct obd_import *import;
1234 import = container_of(ws, struct obd_import, imp_zombie_work);
1235 obd_zombie_import_free(import);
1238 struct obd_import *class_new_import(struct obd_device *obd)
1240 struct obd_import *imp;
1241 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1243 OBD_ALLOC(imp, sizeof(*imp));
1247 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1248 INIT_LIST_HEAD(&imp->imp_replay_list);
1249 INIT_LIST_HEAD(&imp->imp_sending_list);
1250 INIT_LIST_HEAD(&imp->imp_delayed_list);
1251 INIT_LIST_HEAD(&imp->imp_committed_list);
1252 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1253 imp->imp_known_replied_xid = 0;
1254 imp->imp_replay_cursor = &imp->imp_committed_list;
1255 spin_lock_init(&imp->imp_lock);
1256 imp->imp_last_success_conn = 0;
1257 imp->imp_state = LUSTRE_IMP_NEW;
1258 imp->imp_obd = class_incref(obd, "import", imp);
1259 rwlock_init(&imp->imp_sec_lock);
1260 init_waitqueue_head(&imp->imp_recovery_waitq);
1261 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1263 if (curr_pid_ns && curr_pid_ns->child_reaper)
1264 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1266 imp->imp_sec_refpid = 1;
1268 refcount_set(&imp->imp_refcount, 2);
1269 atomic_set(&imp->imp_unregistering, 0);
1270 atomic_set(&imp->imp_inflight, 0);
1271 atomic_set(&imp->imp_replay_inflight, 0);
1272 atomic_set(&imp->imp_inval_count, 0);
1273 INIT_LIST_HEAD(&imp->imp_conn_list);
1274 init_imp_at(&imp->imp_at);
1276 /* the default magic is V2, will be used in connect RPC, and
1277 * then adjusted according to the flags in request/reply. */
1278 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1282 EXPORT_SYMBOL(class_new_import);
1284 void class_destroy_import(struct obd_import *import)
1286 LASSERT(import != NULL);
1287 LASSERT(import != LP_POISON);
1289 spin_lock(&import->imp_lock);
1290 import->imp_generation++;
1291 spin_unlock(&import->imp_lock);
1292 class_import_put(import);
1294 EXPORT_SYMBOL(class_destroy_import);
1296 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1298 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1300 spin_lock(&exp->exp_locks_list_guard);
1302 LASSERT(lock->l_exp_refs_nr >= 0);
1304 if (lock->l_exp_refs_target != NULL &&
1305 lock->l_exp_refs_target != exp) {
1306 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1307 exp, lock, lock->l_exp_refs_target);
1309 if ((lock->l_exp_refs_nr ++) == 0) {
1310 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1311 lock->l_exp_refs_target = exp;
1313 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1314 lock, exp, lock->l_exp_refs_nr);
1315 spin_unlock(&exp->exp_locks_list_guard);
1317 EXPORT_SYMBOL(__class_export_add_lock_ref);
1319 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1321 spin_lock(&exp->exp_locks_list_guard);
1322 LASSERT(lock->l_exp_refs_nr > 0);
1323 if (lock->l_exp_refs_target != exp) {
1324 LCONSOLE_WARN("lock %p, "
1325 "mismatching export pointers: %p, %p\n",
1326 lock, lock->l_exp_refs_target, exp);
1328 if (-- lock->l_exp_refs_nr == 0) {
1329 list_del_init(&lock->l_exp_refs_link);
1330 lock->l_exp_refs_target = NULL;
1332 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1333 lock, exp, lock->l_exp_refs_nr);
1334 spin_unlock(&exp->exp_locks_list_guard);
1336 EXPORT_SYMBOL(__class_export_del_lock_ref);
1339 /* A connection defines an export context in which preallocation can
1340 be managed. This releases the export pointer reference, and returns
1341 the export handle, so the export refcount is 1 when this function
1343 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1344 struct obd_uuid *cluuid)
1346 struct obd_export *export;
1347 LASSERT(conn != NULL);
1348 LASSERT(obd != NULL);
1349 LASSERT(cluuid != NULL);
1352 export = class_new_export(obd, cluuid);
1354 RETURN(PTR_ERR(export));
1356 conn->cookie = export->exp_handle.h_cookie;
1357 class_export_put(export);
1359 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1360 cluuid->uuid, conn->cookie);
1363 EXPORT_SYMBOL(class_connect);
1365 /* if export is involved in recovery then clean up related things */
1366 static void class_export_recovery_cleanup(struct obd_export *exp)
1368 struct obd_device *obd = exp->exp_obd;
1370 spin_lock(&obd->obd_recovery_task_lock);
1371 if (obd->obd_recovering) {
1372 if (exp->exp_in_recovery) {
1373 spin_lock(&exp->exp_lock);
1374 exp->exp_in_recovery = 0;
1375 spin_unlock(&exp->exp_lock);
1376 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1377 atomic_dec(&obd->obd_connected_clients);
1380 /* if called during recovery then should update
1381 * obd_stale_clients counter,
1382 * lightweight exports are not counted */
1383 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1384 exp->exp_obd->obd_stale_clients++;
1386 spin_unlock(&obd->obd_recovery_task_lock);
1388 spin_lock(&exp->exp_lock);
1389 /** Cleanup req replay fields */
1390 if (exp->exp_req_replay_needed) {
1391 exp->exp_req_replay_needed = 0;
1393 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1394 atomic_dec(&obd->obd_req_replay_clients);
1397 /** Cleanup lock replay data */
1398 if (exp->exp_lock_replay_needed) {
1399 exp->exp_lock_replay_needed = 0;
1401 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1402 atomic_dec(&obd->obd_lock_replay_clients);
1404 spin_unlock(&exp->exp_lock);
1407 /* This function removes 1-3 references from the export:
1408 * 1 - for export pointer passed
1409 * and if disconnect really need
1410 * 2 - removing from hash
1411 * 3 - in client_unlink_export
1412 * The export pointer passed to this function can destroyed */
1413 int class_disconnect(struct obd_export *export)
1415 int already_disconnected;
1418 if (export == NULL) {
1419 CWARN("attempting to free NULL export %p\n", export);
1423 spin_lock(&export->exp_lock);
1424 already_disconnected = export->exp_disconnected;
1425 export->exp_disconnected = 1;
1426 /* We hold references of export for uuid hash
1427 * and nid_hash and export link at least. So
1428 * it is safe to call cfs_hash_del in there. */
1429 if (!hlist_unhashed(&export->exp_nid_hash))
1430 cfs_hash_del(export->exp_obd->obd_nid_hash,
1431 &export->exp_connection->c_peer.nid,
1432 &export->exp_nid_hash);
1433 spin_unlock(&export->exp_lock);
1435 /* class_cleanup(), abort_recovery(), and class_fail_export()
1436 * all end up in here, and if any of them race we shouldn't
1437 * call extra class_export_puts(). */
1438 if (already_disconnected) {
1439 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1440 GOTO(no_disconn, already_disconnected);
1443 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1444 export->exp_handle.h_cookie);
1446 class_export_recovery_cleanup(export);
1447 class_unlink_export(export);
1449 class_export_put(export);
1452 EXPORT_SYMBOL(class_disconnect);
1454 /* Return non-zero for a fully connected export */
1455 int class_connected_export(struct obd_export *exp)
1460 spin_lock(&exp->exp_lock);
1461 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1462 spin_unlock(&exp->exp_lock);
1466 EXPORT_SYMBOL(class_connected_export);
1468 static void class_disconnect_export_list(struct list_head *list,
1469 enum obd_option flags)
1472 struct obd_export *exp;
1475 /* It's possible that an export may disconnect itself, but
1476 * nothing else will be added to this list. */
1477 while (!list_empty(list)) {
1478 exp = list_first_entry(list, struct obd_export,
1480 /* need for safe call CDEBUG after obd_disconnect */
1481 class_export_get(exp);
1483 spin_lock(&exp->exp_lock);
1484 exp->exp_flags = flags;
1485 spin_unlock(&exp->exp_lock);
1487 if (obd_uuid_equals(&exp->exp_client_uuid,
1488 &exp->exp_obd->obd_uuid)) {
1490 "exp %p export uuid == obd uuid, don't discon\n",
1492 /* Need to delete this now so we don't end up pointing
1493 * to work_list later when this export is cleaned up. */
1494 list_del_init(&exp->exp_obd_chain);
1495 class_export_put(exp);
1499 class_export_get(exp);
1500 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1501 "last request at %lld\n",
1502 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1503 exp, exp->exp_last_request_time);
1504 /* release one export reference anyway */
1505 rc = obd_disconnect(exp);
1507 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1508 obd_export_nid2str(exp), exp, rc);
1509 class_export_put(exp);
1514 void class_disconnect_exports(struct obd_device *obd)
1516 LIST_HEAD(work_list);
1519 /* Move all of the exports from obd_exports to a work list, en masse. */
1520 spin_lock(&obd->obd_dev_lock);
1521 list_splice_init(&obd->obd_exports, &work_list);
1522 list_splice_init(&obd->obd_delayed_exports, &work_list);
1523 spin_unlock(&obd->obd_dev_lock);
1525 if (!list_empty(&work_list)) {
1526 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1527 "disconnecting them\n", obd->obd_minor, obd);
1528 class_disconnect_export_list(&work_list,
1529 exp_flags_from_obd(obd));
1531 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1532 obd->obd_minor, obd);
1535 EXPORT_SYMBOL(class_disconnect_exports);
1537 /* Remove exports that have not completed recovery.
1539 void class_disconnect_stale_exports(struct obd_device *obd,
1540 int (*test_export)(struct obd_export *))
1542 LIST_HEAD(work_list);
1543 struct obd_export *exp, *n;
1547 spin_lock(&obd->obd_dev_lock);
1548 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1550 /* don't count self-export as client */
1551 if (obd_uuid_equals(&exp->exp_client_uuid,
1552 &exp->exp_obd->obd_uuid))
1555 /* don't evict clients which have no slot in last_rcvd
1556 * (e.g. lightweight connection) */
1557 if (exp->exp_target_data.ted_lr_idx == -1)
1560 spin_lock(&exp->exp_lock);
1561 if (exp->exp_failed || test_export(exp)) {
1562 spin_unlock(&exp->exp_lock);
1565 exp->exp_failed = 1;
1566 spin_unlock(&exp->exp_lock);
1568 list_move(&exp->exp_obd_chain, &work_list);
1570 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1571 obd->obd_name, exp->exp_client_uuid.uuid,
1572 obd_export_nid2str(exp));
1573 print_export_data(exp, "EVICTING", 0, D_HA);
1575 spin_unlock(&obd->obd_dev_lock);
1578 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1579 obd->obd_name, evicted);
1581 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1582 OBD_OPT_ABORT_RECOV);
1585 EXPORT_SYMBOL(class_disconnect_stale_exports);
1587 void class_fail_export(struct obd_export *exp)
1589 int rc, already_failed;
1591 spin_lock(&exp->exp_lock);
1592 already_failed = exp->exp_failed;
1593 exp->exp_failed = 1;
1594 spin_unlock(&exp->exp_lock);
1596 if (already_failed) {
1597 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1598 exp, exp->exp_client_uuid.uuid);
1602 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1603 exp, exp->exp_client_uuid.uuid);
1605 if (obd_dump_on_timeout)
1606 libcfs_debug_dumplog();
1608 /* need for safe call CDEBUG after obd_disconnect */
1609 class_export_get(exp);
1611 /* Most callers into obd_disconnect are removing their own reference
1612 * (request, for example) in addition to the one from the hash table.
1613 * We don't have such a reference here, so make one. */
1614 class_export_get(exp);
1615 rc = obd_disconnect(exp);
1617 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1619 CDEBUG(D_HA, "disconnected export %p/%s\n",
1620 exp, exp->exp_client_uuid.uuid);
1621 class_export_put(exp);
1623 EXPORT_SYMBOL(class_fail_export);
1625 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1627 struct cfs_hash *nid_hash;
1628 struct obd_export *doomed_exp = NULL;
1629 int exports_evicted = 0;
1631 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1633 spin_lock(&obd->obd_dev_lock);
1634 /* umount has run already, so evict thread should leave
1635 * its task to umount thread now */
1636 if (obd->obd_stopping) {
1637 spin_unlock(&obd->obd_dev_lock);
1638 return exports_evicted;
1640 nid_hash = obd->obd_nid_hash;
1641 cfs_hash_getref(nid_hash);
1642 spin_unlock(&obd->obd_dev_lock);
1645 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1646 if (doomed_exp == NULL)
1649 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1650 "nid %s found, wanted nid %s, requested nid %s\n",
1651 obd_export_nid2str(doomed_exp),
1652 libcfs_nid2str(nid_key), nid);
1653 LASSERTF(doomed_exp != obd->obd_self_export,
1654 "self-export is hashed by NID?\n");
1656 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1657 "request\n", obd->obd_name,
1658 obd_uuid2str(&doomed_exp->exp_client_uuid),
1659 obd_export_nid2str(doomed_exp));
1660 class_fail_export(doomed_exp);
1661 class_export_put(doomed_exp);
1664 cfs_hash_putref(nid_hash);
1666 if (!exports_evicted)
1667 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1668 obd->obd_name, nid);
1669 return exports_evicted;
1671 EXPORT_SYMBOL(obd_export_evict_by_nid);
1673 #ifdef HAVE_SERVER_SUPPORT
1674 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1676 struct obd_export *doomed_exp = NULL;
1677 struct obd_uuid doomed_uuid;
1678 int exports_evicted = 0;
1680 spin_lock(&obd->obd_dev_lock);
1681 if (obd->obd_stopping) {
1682 spin_unlock(&obd->obd_dev_lock);
1683 return exports_evicted;
1685 spin_unlock(&obd->obd_dev_lock);
1687 obd_str2uuid(&doomed_uuid, uuid);
1688 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1689 CERROR("%s: can't evict myself\n", obd->obd_name);
1690 return exports_evicted;
1693 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1694 if (doomed_exp == NULL) {
1695 CERROR("%s: can't disconnect %s: no exports found\n",
1696 obd->obd_name, uuid);
1698 CWARN("%s: evicting %s at adminstrative request\n",
1699 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1700 class_fail_export(doomed_exp);
1701 class_export_put(doomed_exp);
1702 obd_uuid_del(obd, doomed_exp);
1706 return exports_evicted;
1708 #endif /* HAVE_SERVER_SUPPORT */
1710 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1711 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1712 EXPORT_SYMBOL(class_export_dump_hook);
1715 static void print_export_data(struct obd_export *exp, const char *status,
1716 int locks, int debug_level)
1718 struct ptlrpc_reply_state *rs;
1719 struct ptlrpc_reply_state *first_reply = NULL;
1722 spin_lock(&exp->exp_lock);
1723 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1729 spin_unlock(&exp->exp_lock);
1731 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1732 "%p %s %llu stale:%d\n",
1733 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1734 obd_export_nid2str(exp),
1735 refcount_read(&exp->exp_handle.h_ref),
1736 atomic_read(&exp->exp_rpc_count),
1737 atomic_read(&exp->exp_cb_count),
1738 atomic_read(&exp->exp_locks_count),
1739 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1740 nreplies, first_reply, nreplies > 3 ? "..." : "",
1741 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1742 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1743 if (locks && class_export_dump_hook != NULL)
1744 class_export_dump_hook(exp);
1748 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1750 struct obd_export *exp;
1752 spin_lock(&obd->obd_dev_lock);
1753 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1754 print_export_data(exp, "ACTIVE", locks, debug_level);
1755 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1756 print_export_data(exp, "UNLINKED", locks, debug_level);
1757 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1758 print_export_data(exp, "DELAYED", locks, debug_level);
1759 spin_unlock(&obd->obd_dev_lock);
1762 void obd_exports_barrier(struct obd_device *obd)
1765 LASSERT(list_empty(&obd->obd_exports));
1766 spin_lock(&obd->obd_dev_lock);
1767 while (!list_empty(&obd->obd_unlinked_exports)) {
1768 spin_unlock(&obd->obd_dev_lock);
1769 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1770 if (waited > 5 && is_power_of_2(waited)) {
1771 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1772 "more than %d seconds. "
1773 "The obd refcount = %d. Is it stuck?\n",
1774 obd->obd_name, waited,
1775 atomic_read(&obd->obd_refcount));
1776 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1779 spin_lock(&obd->obd_dev_lock);
1781 spin_unlock(&obd->obd_dev_lock);
1783 EXPORT_SYMBOL(obd_exports_barrier);
1786 * Add export to the obd_zombe thread and notify it.
1788 static void obd_zombie_export_add(struct obd_export *exp) {
1789 atomic_dec(&obd_stale_export_num);
1790 spin_lock(&exp->exp_obd->obd_dev_lock);
1791 LASSERT(!list_empty(&exp->exp_obd_chain));
1792 list_del_init(&exp->exp_obd_chain);
1793 spin_unlock(&exp->exp_obd->obd_dev_lock);
1795 queue_work(zombie_wq, &exp->exp_zombie_work);
1799 * Add import to the obd_zombe thread and notify it.
1801 static void obd_zombie_import_add(struct obd_import *imp) {
1802 LASSERT(imp->imp_sec == NULL);
1804 queue_work(zombie_wq, &imp->imp_zombie_work);
1808 * wait when obd_zombie import/export queues become empty
1810 void obd_zombie_barrier(void)
1812 flush_workqueue(zombie_wq);
1814 EXPORT_SYMBOL(obd_zombie_barrier);
1817 struct obd_export *obd_stale_export_get(void)
1819 struct obd_export *exp = NULL;
1822 spin_lock(&obd_stale_export_lock);
1823 if (!list_empty(&obd_stale_exports)) {
1824 exp = list_first_entry(&obd_stale_exports,
1825 struct obd_export, exp_stale_list);
1826 list_del_init(&exp->exp_stale_list);
1828 spin_unlock(&obd_stale_export_lock);
1831 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1832 atomic_read(&obd_stale_export_num));
1836 EXPORT_SYMBOL(obd_stale_export_get);
1838 void obd_stale_export_put(struct obd_export *exp)
1842 LASSERT(list_empty(&exp->exp_stale_list));
1843 if (exp->exp_lock_hash &&
1844 atomic_read(&exp->exp_lock_hash->hs_count)) {
1845 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1846 atomic_read(&obd_stale_export_num));
1848 spin_lock_bh(&exp->exp_bl_list_lock);
1849 spin_lock(&obd_stale_export_lock);
1850 /* Add to the tail if there is no blocked locks,
1851 * to the head otherwise. */
1852 if (list_empty(&exp->exp_bl_list))
1853 list_add_tail(&exp->exp_stale_list,
1854 &obd_stale_exports);
1856 list_add(&exp->exp_stale_list,
1857 &obd_stale_exports);
1859 spin_unlock(&obd_stale_export_lock);
1860 spin_unlock_bh(&exp->exp_bl_list_lock);
1862 class_export_put(exp);
1866 EXPORT_SYMBOL(obd_stale_export_put);
1869 * Adjust the position of the export in the stale list,
1870 * i.e. move to the head of the list if is needed.
1872 void obd_stale_export_adjust(struct obd_export *exp)
1874 LASSERT(exp != NULL);
1875 spin_lock_bh(&exp->exp_bl_list_lock);
1876 spin_lock(&obd_stale_export_lock);
1878 if (!list_empty(&exp->exp_stale_list) &&
1879 !list_empty(&exp->exp_bl_list))
1880 list_move(&exp->exp_stale_list, &obd_stale_exports);
1882 spin_unlock(&obd_stale_export_lock);
1883 spin_unlock_bh(&exp->exp_bl_list_lock);
1885 EXPORT_SYMBOL(obd_stale_export_adjust);
1888 * start destroy zombie import/export thread
1890 int obd_zombie_impexp_init(void)
1892 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1900 * stop destroy zombie import/export thread
1902 void obd_zombie_impexp_stop(void)
1904 destroy_workqueue(zombie_wq);
1905 LASSERT(list_empty(&obd_stale_exports));
1908 /***** Kernel-userspace comm helpers *******/
1910 /* Get length of entire message, including header */
1911 int kuc_len(int payload_len)
1913 return sizeof(struct kuc_hdr) + payload_len;
1915 EXPORT_SYMBOL(kuc_len);
1917 /* Get a pointer to kuc header, given a ptr to the payload
1918 * @param p Pointer to payload area
1919 * @returns Pointer to kuc header
1921 struct kuc_hdr * kuc_ptr(void *p)
1923 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1924 LASSERT(lh->kuc_magic == KUC_MAGIC);
1927 EXPORT_SYMBOL(kuc_ptr);
1929 /* Alloc space for a message, and fill in header
1930 * @return Pointer to payload area
1932 void *kuc_alloc(int payload_len, int transport, int type)
1935 int len = kuc_len(payload_len);
1939 return ERR_PTR(-ENOMEM);
1941 lh->kuc_magic = KUC_MAGIC;
1942 lh->kuc_transport = transport;
1943 lh->kuc_msgtype = type;
1944 lh->kuc_msglen = len;
1946 return (void *)(lh + 1);
1948 EXPORT_SYMBOL(kuc_alloc);
1950 /* Takes pointer to payload area */
1951 void kuc_free(void *p, int payload_len)
1953 struct kuc_hdr *lh = kuc_ptr(p);
1954 OBD_FREE(lh, kuc_len(payload_len));
1956 EXPORT_SYMBOL(kuc_free);
1958 struct obd_request_slot_waiter {
1959 struct list_head orsw_entry;
1960 wait_queue_head_t orsw_waitq;
1964 static bool obd_request_slot_avail(struct client_obd *cli,
1965 struct obd_request_slot_waiter *orsw)
1969 spin_lock(&cli->cl_loi_list_lock);
1970 avail = !!list_empty(&orsw->orsw_entry);
1971 spin_unlock(&cli->cl_loi_list_lock);
1977 * For network flow control, the RPC sponsor needs to acquire a credit
1978 * before sending the RPC. The credits count for a connection is defined
1979 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1980 * the subsequent RPC sponsors need to wait until others released their
1981 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1983 int obd_get_request_slot(struct client_obd *cli)
1985 struct obd_request_slot_waiter orsw;
1988 spin_lock(&cli->cl_loi_list_lock);
1989 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1990 cli->cl_rpcs_in_flight++;
1991 spin_unlock(&cli->cl_loi_list_lock);
1995 init_waitqueue_head(&orsw.orsw_waitq);
1996 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
1997 orsw.orsw_signaled = false;
1998 spin_unlock(&cli->cl_loi_list_lock);
2000 rc = l_wait_event_abortable(orsw.orsw_waitq,
2001 obd_request_slot_avail(cli, &orsw) ||
2002 orsw.orsw_signaled);
2004 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2005 * freed but other (such as obd_put_request_slot) is using it. */
2006 spin_lock(&cli->cl_loi_list_lock);
2008 if (!orsw.orsw_signaled) {
2009 if (list_empty(&orsw.orsw_entry))
2010 cli->cl_rpcs_in_flight--;
2012 list_del(&orsw.orsw_entry);
2017 if (orsw.orsw_signaled) {
2018 LASSERT(list_empty(&orsw.orsw_entry));
2022 spin_unlock(&cli->cl_loi_list_lock);
2026 EXPORT_SYMBOL(obd_get_request_slot);
2028 void obd_put_request_slot(struct client_obd *cli)
2030 struct obd_request_slot_waiter *orsw;
2032 spin_lock(&cli->cl_loi_list_lock);
2033 cli->cl_rpcs_in_flight--;
2035 /* If there is free slot, wakeup the first waiter. */
2036 if (!list_empty(&cli->cl_flight_waiters) &&
2037 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2038 orsw = list_first_entry(&cli->cl_flight_waiters,
2039 struct obd_request_slot_waiter,
2041 list_del_init(&orsw->orsw_entry);
2042 cli->cl_rpcs_in_flight++;
2043 wake_up(&orsw->orsw_waitq);
2045 spin_unlock(&cli->cl_loi_list_lock);
2047 EXPORT_SYMBOL(obd_put_request_slot);
2049 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2051 return cli->cl_max_rpcs_in_flight;
2053 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2055 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2057 struct obd_request_slot_waiter *orsw;
2061 const char *type_name;
2064 if (max > OBD_MAX_RIF_MAX || max < 1)
2067 type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2068 if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2069 /* adjust max_mod_rpcs_in_flight to ensure it is always
2070 * strictly lower that max_rpcs_in_flight */
2072 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2073 "because it must be higher than "
2074 "max_mod_rpcs_in_flight value",
2075 cli->cl_import->imp_obd->obd_name);
2078 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2079 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2085 spin_lock(&cli->cl_loi_list_lock);
2086 old = cli->cl_max_rpcs_in_flight;
2087 cli->cl_max_rpcs_in_flight = max;
2088 client_adjust_max_dirty(cli);
2092 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2093 for (i = 0; i < diff; i++) {
2094 if (list_empty(&cli->cl_flight_waiters))
2097 orsw = list_first_entry(&cli->cl_flight_waiters,
2098 struct obd_request_slot_waiter,
2100 list_del_init(&orsw->orsw_entry);
2101 cli->cl_rpcs_in_flight++;
2102 wake_up(&orsw->orsw_waitq);
2104 spin_unlock(&cli->cl_loi_list_lock);
2108 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2110 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2112 return cli->cl_max_mod_rpcs_in_flight;
2114 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2116 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2118 struct obd_connect_data *ocd;
2122 if (max > OBD_MAX_RIF_MAX || max < 1)
2125 /* cannot exceed or equal max_rpcs_in_flight */
2126 if (max >= cli->cl_max_rpcs_in_flight) {
2127 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2128 "higher or equal to max_rpcs_in_flight value (%u)\n",
2129 cli->cl_import->imp_obd->obd_name,
2130 max, cli->cl_max_rpcs_in_flight);
2134 /* cannot exceed max modify RPCs in flight supported by the server */
2135 ocd = &cli->cl_import->imp_connect_data;
2136 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2137 maxmodrpcs = ocd->ocd_maxmodrpcs;
2140 if (max > maxmodrpcs) {
2141 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2142 "higher than max_mod_rpcs_per_client value (%hu) "
2143 "returned by the server at connection\n",
2144 cli->cl_import->imp_obd->obd_name,
2149 spin_lock(&cli->cl_mod_rpcs_lock);
2151 prev = cli->cl_max_mod_rpcs_in_flight;
2152 cli->cl_max_mod_rpcs_in_flight = max;
2154 /* wakeup waiters if limit has been increased */
2155 if (cli->cl_max_mod_rpcs_in_flight > prev)
2156 wake_up(&cli->cl_mod_rpcs_waitq);
2158 spin_unlock(&cli->cl_mod_rpcs_lock);
2162 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2164 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2165 struct seq_file *seq)
2167 unsigned long mod_tot = 0, mod_cum;
2168 struct timespec64 now;
2171 ktime_get_real_ts64(&now);
2173 spin_lock(&cli->cl_mod_rpcs_lock);
2175 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2176 (s64)now.tv_sec, now.tv_nsec);
2177 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2178 cli->cl_mod_rpcs_in_flight);
2180 seq_printf(seq, "\n\t\t\tmodify\n");
2181 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2183 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2186 for (i = 0; i < OBD_HIST_MAX; i++) {
2187 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2189 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2190 i, mod, pct(mod, mod_tot),
2191 pct(mod_cum, mod_tot));
2192 if (mod_cum == mod_tot)
2196 spin_unlock(&cli->cl_mod_rpcs_lock);
2200 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2202 /* The number of modify RPCs sent in parallel is limited
2203 * because the server has a finite number of slots per client to
2204 * store request result and ensure reply reconstruction when needed.
2205 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2206 * that takes into account server limit and cl_max_rpcs_in_flight
2208 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2209 * one close request is allowed above the maximum.
2211 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2216 /* A slot is available if
2217 * - number of modify RPCs in flight is less than the max
2218 * - it's a close RPC and no other close request is in flight
2220 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2221 (close_req && cli->cl_close_rpcs_in_flight == 0);
2226 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2231 spin_lock(&cli->cl_mod_rpcs_lock);
2232 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2233 spin_unlock(&cli->cl_mod_rpcs_lock);
2238 /* Get a modify RPC slot from the obd client @cli according
2239 * to the kind of operation @opc that is going to be sent
2240 * and the intent @it of the operation if it applies.
2241 * If the maximum number of modify RPCs in flight is reached
2242 * the thread is put to sleep.
2243 * Returns the tag to be set in the request message. Tag 0
2244 * is reserved for non-modifying requests.
2246 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2248 bool close_req = false;
2251 if (opc == MDS_CLOSE)
2255 spin_lock(&cli->cl_mod_rpcs_lock);
2256 max = cli->cl_max_mod_rpcs_in_flight;
2257 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2258 /* there is a slot available */
2259 cli->cl_mod_rpcs_in_flight++;
2261 cli->cl_close_rpcs_in_flight++;
2262 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2263 cli->cl_mod_rpcs_in_flight);
2264 /* find a free tag */
2265 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2267 LASSERT(i < OBD_MAX_RIF_MAX);
2268 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2269 spin_unlock(&cli->cl_mod_rpcs_lock);
2270 /* tag 0 is reserved for non-modify RPCs */
2273 "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2274 cli->cl_import->imp_obd->obd_name,
2279 spin_unlock(&cli->cl_mod_rpcs_lock);
2281 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2282 "opc %u, max %hu\n",
2283 cli->cl_import->imp_obd->obd_name, opc, max);
2285 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2286 obd_mod_rpc_slot_avail(cli,
2290 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2292 /* Put a modify RPC slot from the obd client @cli according
2293 * to the kind of operation @opc that has been sent.
2295 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2297 bool close_req = false;
2302 if (opc == MDS_CLOSE)
2305 spin_lock(&cli->cl_mod_rpcs_lock);
2306 cli->cl_mod_rpcs_in_flight--;
2308 cli->cl_close_rpcs_in_flight--;
2309 /* release the tag in the bitmap */
2310 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2311 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2312 spin_unlock(&cli->cl_mod_rpcs_lock);
2313 wake_up(&cli->cl_mod_rpcs_waitq);
2315 EXPORT_SYMBOL(obd_put_mod_rpc_slot);