4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
54 static struct kmem_cache *obd_device_cachep;
56 static struct workqueue_struct *zombie_wq;
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61 const char *status, int locks, int debug_level);
63 static LIST_HEAD(obd_stale_exports);
64 static DEFINE_SPINLOCK(obd_stale_export_lock);
65 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
101 struct list_head *tmp;
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 list_for_each(tmp, &obd_types) {
106 type = list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
152 void class_put_type(struct obd_type *type)
155 spin_lock(&type->obd_type_lock);
157 module_put(type->typ_dt_ops->o_owner);
158 spin_unlock(&type->obd_type_lock);
161 static void class_sysfs_release(struct kobject *kobj)
163 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
165 #ifdef HAVE_SERVER_SUPPORT
166 if (type->typ_sym_filter)
167 type->typ_debugfs_entry = NULL;
169 debugfs_remove_recursive(type->typ_debugfs_entry);
170 type->typ_debugfs_entry = NULL;
173 lu_device_type_fini(type->typ_lu);
175 spin_lock(&obd_types_lock);
176 list_del(&type->typ_chain);
177 spin_unlock(&obd_types_lock);
179 #ifdef CONFIG_PROC_FS
180 if (type->typ_name && type->typ_procroot)
181 remove_proc_subtree(type->typ_name, proc_lustre_root);
183 if (type->typ_md_ops)
184 OBD_FREE_PTR(type->typ_md_ops);
185 if (type->typ_dt_ops)
186 OBD_FREE_PTR(type->typ_dt_ops);
188 OBD_FREE(type, sizeof(*type));
191 static struct kobj_type class_ktype = {
192 .sysfs_ops = &lustre_sysfs_ops,
193 .release = class_sysfs_release,
196 #ifdef HAVE_SERVER_SUPPORT
197 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
199 struct dentry *symlink;
200 struct obd_type *type;
201 struct kobject *kobj;
204 kobj = kset_find_obj(lustre_kset, name);
207 return ERR_PTR(-EEXIST);
210 OBD_ALLOC(type, sizeof(*type));
212 return ERR_PTR(-ENOMEM);
214 INIT_LIST_HEAD(&type->typ_chain);
216 type->typ_kobj.kset = lustre_kset;
217 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
218 &lustre_kset->kobj, "%s", name);
222 symlink = debugfs_create_dir(name, debugfs_lustre_root);
223 if (IS_ERR_OR_NULL(symlink)) {
224 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
225 kobject_put(&type->typ_kobj);
228 type->typ_debugfs_entry = symlink;
229 type->typ_sym_filter = true;
232 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
234 if (IS_ERR(type->typ_procroot)) {
235 CERROR("%s: can't create compat proc entry: %d\n",
236 name, (int)PTR_ERR(type->typ_procroot));
237 type->typ_procroot = NULL;
243 EXPORT_SYMBOL(class_add_symlinks);
244 #endif /* HAVE_SERVER_SUPPORT */
246 #define CLASS_MAX_NAME 1024
248 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
249 bool enable_proc, struct lprocfs_vars *vars,
250 const char *name, struct lu_device_type *ldt)
252 struct obd_type *type;
257 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
259 if (class_search_type(name)) {
260 #ifdef HAVE_SERVER_SUPPORT
261 if (strcmp(name, LUSTRE_LOV_NAME) == 0 ||
262 strcmp(name, LUSTRE_OSC_NAME) == 0) {
263 struct kobject *kobj;
265 kobj = kset_find_obj(lustre_kset, name);
267 type = container_of(kobj, struct obd_type,
272 #endif /* HAVE_SERVER_SUPPORT */
273 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
277 OBD_ALLOC(type, sizeof(*type));
281 INIT_LIST_HEAD(&type->typ_chain);
282 type->typ_kobj.kset = lustre_kset;
283 kobject_init(&type->typ_kobj, &class_ktype);
284 #ifdef HAVE_SERVER_SUPPORT
286 #endif /* HAVE_SERVER_SUPPORT */
287 OBD_ALLOC_PTR(type->typ_dt_ops);
288 OBD_ALLOC_PTR(type->typ_md_ops);
290 if (type->typ_dt_ops == NULL ||
291 type->typ_md_ops == NULL)
292 GOTO (failed, rc = -ENOMEM);
294 *(type->typ_dt_ops) = *dt_ops;
295 /* md_ops is optional */
297 *(type->typ_md_ops) = *md_ops;
298 spin_lock_init(&type->obd_type_lock);
300 #ifdef HAVE_SERVER_SUPPORT
301 if (type->typ_sym_filter)
304 #ifdef CONFIG_PROC_FS
305 if (enable_proc && !type->typ_procroot) {
306 type->typ_procroot = lprocfs_register(name,
309 if (IS_ERR(type->typ_procroot)) {
310 rc = PTR_ERR(type->typ_procroot);
311 type->typ_procroot = NULL;
316 type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
318 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
319 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
321 type->typ_debugfs_entry = NULL;
325 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
328 #ifdef HAVE_SERVER_SUPPORT
333 rc = lu_device_type_init(ldt);
338 spin_lock(&obd_types_lock);
339 list_add(&type->typ_chain, &obd_types);
340 spin_unlock(&obd_types_lock);
345 kobject_put(&type->typ_kobj);
349 EXPORT_SYMBOL(class_register_type);
351 int class_unregister_type(const char *name)
353 struct obd_type *type = class_search_type(name);
357 CERROR("unknown obd type\n");
361 if (type->typ_refcnt) {
362 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
363 /* This is a bad situation, let's make the best of it */
364 /* Remove ops, but leave the name for debugging */
365 OBD_FREE_PTR(type->typ_dt_ops);
366 OBD_FREE_PTR(type->typ_md_ops);
370 kobject_put(&type->typ_kobj);
373 } /* class_unregister_type */
374 EXPORT_SYMBOL(class_unregister_type);
377 * Create a new obd device.
379 * Allocate the new obd_device and initialize it.
381 * \param[in] type_name obd device type string.
382 * \param[in] name obd device name.
383 * \param[in] uuid obd device UUID
385 * \retval newdev pointer to created obd_device
386 * \retval ERR_PTR(errno) on error
388 struct obd_device *class_newdev(const char *type_name, const char *name,
391 struct obd_device *newdev;
392 struct obd_type *type = NULL;
395 if (strlen(name) >= MAX_OBD_NAME) {
396 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
397 RETURN(ERR_PTR(-EINVAL));
400 type = class_get_type(type_name);
402 CERROR("OBD: unknown type: %s\n", type_name);
403 RETURN(ERR_PTR(-ENODEV));
406 newdev = obd_device_alloc();
407 if (newdev == NULL) {
408 class_put_type(type);
409 RETURN(ERR_PTR(-ENOMEM));
411 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
412 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
413 newdev->obd_type = type;
414 newdev->obd_minor = -1;
416 rwlock_init(&newdev->obd_pool_lock);
417 newdev->obd_pool_limit = 0;
418 newdev->obd_pool_slv = 0;
420 INIT_LIST_HEAD(&newdev->obd_exports);
421 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
422 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
423 INIT_LIST_HEAD(&newdev->obd_exports_timed);
424 INIT_LIST_HEAD(&newdev->obd_nid_stats);
425 spin_lock_init(&newdev->obd_nid_lock);
426 spin_lock_init(&newdev->obd_dev_lock);
427 mutex_init(&newdev->obd_dev_mutex);
428 spin_lock_init(&newdev->obd_osfs_lock);
429 /* newdev->obd_osfs_age must be set to a value in the distant
430 * past to guarantee a fresh statfs is fetched on mount. */
431 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
433 /* XXX belongs in setup not attach */
434 init_rwsem(&newdev->obd_observer_link_sem);
436 spin_lock_init(&newdev->obd_recovery_task_lock);
437 init_waitqueue_head(&newdev->obd_next_transno_waitq);
438 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
439 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
440 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
441 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
442 INIT_LIST_HEAD(&newdev->obd_evict_list);
443 INIT_LIST_HEAD(&newdev->obd_lwp_list);
445 llog_group_init(&newdev->obd_olg);
446 /* Detach drops this */
447 atomic_set(&newdev->obd_refcount, 1);
448 lu_ref_init(&newdev->obd_reference);
449 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
451 newdev->obd_conn_inprogress = 0;
453 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
455 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
456 newdev->obd_name, newdev);
464 * \param[in] obd obd_device to be freed
468 void class_free_dev(struct obd_device *obd)
470 struct obd_type *obd_type = obd->obd_type;
472 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
473 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
474 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
475 "obd %p != obd_devs[%d] %p\n",
476 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
477 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
478 "obd_refcount should be 0, not %d\n",
479 atomic_read(&obd->obd_refcount));
480 LASSERT(obd_type != NULL);
482 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
483 obd->obd_name, obd->obd_type->typ_name);
485 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
486 obd->obd_name, obd->obd_uuid.uuid);
487 if (obd->obd_stopping) {
490 /* If we're not stopping, we were never set up */
491 err = obd_cleanup(obd);
493 CERROR("Cleanup %s returned %d\n",
497 obd_device_free(obd);
499 class_put_type(obd_type);
503 * Unregister obd device.
505 * Free slot in obd_dev[] used by \a obd.
507 * \param[in] new_obd obd_device to be unregistered
511 void class_unregister_device(struct obd_device *obd)
513 write_lock(&obd_dev_lock);
514 if (obd->obd_minor >= 0) {
515 LASSERT(obd_devs[obd->obd_minor] == obd);
516 obd_devs[obd->obd_minor] = NULL;
519 write_unlock(&obd_dev_lock);
523 * Register obd device.
525 * Find free slot in obd_devs[], fills it with \a new_obd.
527 * \param[in] new_obd obd_device to be registered
530 * \retval -EEXIST device with this name is registered
531 * \retval -EOVERFLOW obd_devs[] is full
533 int class_register_device(struct obd_device *new_obd)
537 int new_obd_minor = 0;
538 bool minor_assign = false;
539 bool retried = false;
542 write_lock(&obd_dev_lock);
543 for (i = 0; i < class_devno_max(); i++) {
544 struct obd_device *obd = class_num2obd(i);
547 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
550 write_unlock(&obd_dev_lock);
552 /* the obd_device could be waited to be
553 * destroyed by the "obd_zombie_impexp_thread".
555 obd_zombie_barrier();
560 CERROR("%s: already exists, won't add\n",
562 /* in case we found a free slot before duplicate */
563 minor_assign = false;
567 if (!minor_assign && obd == NULL) {
574 new_obd->obd_minor = new_obd_minor;
575 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
576 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
577 obd_devs[new_obd_minor] = new_obd;
581 CERROR("%s: all %u/%u devices used, increase "
582 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
583 i, class_devno_max(), ret);
586 write_unlock(&obd_dev_lock);
591 static int class_name2dev_nolock(const char *name)
598 for (i = 0; i < class_devno_max(); i++) {
599 struct obd_device *obd = class_num2obd(i);
601 if (obd && strcmp(name, obd->obd_name) == 0) {
602 /* Make sure we finished attaching before we give
603 out any references */
604 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
605 if (obd->obd_attached) {
615 int class_name2dev(const char *name)
622 read_lock(&obd_dev_lock);
623 i = class_name2dev_nolock(name);
624 read_unlock(&obd_dev_lock);
628 EXPORT_SYMBOL(class_name2dev);
630 struct obd_device *class_name2obd(const char *name)
632 int dev = class_name2dev(name);
634 if (dev < 0 || dev > class_devno_max())
636 return class_num2obd(dev);
638 EXPORT_SYMBOL(class_name2obd);
640 int class_uuid2dev_nolock(struct obd_uuid *uuid)
644 for (i = 0; i < class_devno_max(); i++) {
645 struct obd_device *obd = class_num2obd(i);
647 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
648 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
656 int class_uuid2dev(struct obd_uuid *uuid)
660 read_lock(&obd_dev_lock);
661 i = class_uuid2dev_nolock(uuid);
662 read_unlock(&obd_dev_lock);
666 EXPORT_SYMBOL(class_uuid2dev);
668 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
670 int dev = class_uuid2dev(uuid);
673 return class_num2obd(dev);
675 EXPORT_SYMBOL(class_uuid2obd);
678 * Get obd device from ::obd_devs[]
680 * \param num [in] array index
682 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
683 * otherwise return the obd device there.
685 struct obd_device *class_num2obd(int num)
687 struct obd_device *obd = NULL;
689 if (num < class_devno_max()) {
694 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
695 "%p obd_magic %08x != %08x\n",
696 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
697 LASSERTF(obd->obd_minor == num,
698 "%p obd_minor %0d != %0d\n",
699 obd, obd->obd_minor, num);
706 * Find obd in obd_dev[] by name or uuid.
708 * Increment obd's refcount if found.
710 * \param[in] str obd name or uuid
712 * \retval NULL if not found
713 * \retval target pointer to found obd_device
715 struct obd_device *class_dev_by_str(const char *str)
717 struct obd_device *target = NULL;
718 struct obd_uuid tgtuuid;
721 obd_str2uuid(&tgtuuid, str);
723 read_lock(&obd_dev_lock);
724 rc = class_uuid2dev_nolock(&tgtuuid);
726 rc = class_name2dev_nolock(str);
729 target = class_num2obd(rc);
732 class_incref(target, "find", current);
733 read_unlock(&obd_dev_lock);
737 EXPORT_SYMBOL(class_dev_by_str);
740 * Get obd devices count. Device in any
742 * \retval obd device count
744 int get_devices_count(void)
746 int index, max_index = class_devno_max(), dev_count = 0;
748 read_lock(&obd_dev_lock);
749 for (index = 0; index <= max_index; index++) {
750 struct obd_device *obd = class_num2obd(index);
754 read_unlock(&obd_dev_lock);
758 EXPORT_SYMBOL(get_devices_count);
760 void class_obd_list(void)
765 read_lock(&obd_dev_lock);
766 for (i = 0; i < class_devno_max(); i++) {
767 struct obd_device *obd = class_num2obd(i);
771 if (obd->obd_stopping)
773 else if (obd->obd_set_up)
775 else if (obd->obd_attached)
779 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
780 i, status, obd->obd_type->typ_name,
781 obd->obd_name, obd->obd_uuid.uuid,
782 atomic_read(&obd->obd_refcount));
784 read_unlock(&obd_dev_lock);
788 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
789 specified, then only the client with that uuid is returned,
790 otherwise any client connected to the tgt is returned. */
791 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
792 const char *type_name,
793 struct obd_uuid *grp_uuid)
797 read_lock(&obd_dev_lock);
798 for (i = 0; i < class_devno_max(); i++) {
799 struct obd_device *obd = class_num2obd(i);
803 if ((strncmp(obd->obd_type->typ_name, type_name,
804 strlen(type_name)) == 0)) {
805 if (obd_uuid_equals(tgt_uuid,
806 &obd->u.cli.cl_target_uuid) &&
807 ((grp_uuid)? obd_uuid_equals(grp_uuid,
808 &obd->obd_uuid) : 1)) {
809 read_unlock(&obd_dev_lock);
814 read_unlock(&obd_dev_lock);
818 EXPORT_SYMBOL(class_find_client_obd);
820 /* Iterate the obd_device list looking devices have grp_uuid. Start
821 searching at *next, and if a device is found, the next index to look
822 at is saved in *next. If next is NULL, then the first matching device
823 will always be returned. */
824 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
830 else if (*next >= 0 && *next < class_devno_max())
835 read_lock(&obd_dev_lock);
836 for (; i < class_devno_max(); i++) {
837 struct obd_device *obd = class_num2obd(i);
841 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
844 read_unlock(&obd_dev_lock);
848 read_unlock(&obd_dev_lock);
852 EXPORT_SYMBOL(class_devices_in_group);
855 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
856 * adjust sptlrpc settings accordingly.
858 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
860 struct obd_device *obd;
864 LASSERT(namelen > 0);
866 read_lock(&obd_dev_lock);
867 for (i = 0; i < class_devno_max(); i++) {
868 obd = class_num2obd(i);
870 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
873 /* only notify mdc, osc, osp, lwp, mdt, ost
874 * because only these have a -sptlrpc llog */
875 type = obd->obd_type->typ_name;
876 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
877 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
878 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
879 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
880 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
881 strcmp(type, LUSTRE_OST_NAME) != 0)
884 if (strncmp(obd->obd_name, fsname, namelen))
887 class_incref(obd, __FUNCTION__, obd);
888 read_unlock(&obd_dev_lock);
889 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
890 sizeof(KEY_SPTLRPC_CONF),
891 KEY_SPTLRPC_CONF, 0, NULL, NULL);
893 class_decref(obd, __FUNCTION__, obd);
894 read_lock(&obd_dev_lock);
896 read_unlock(&obd_dev_lock);
899 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
901 void obd_cleanup_caches(void)
904 if (obd_device_cachep) {
905 kmem_cache_destroy(obd_device_cachep);
906 obd_device_cachep = NULL;
912 int obd_init_caches(void)
917 LASSERT(obd_device_cachep == NULL);
918 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
919 sizeof(struct obd_device),
920 0, 0, 0, sizeof(struct obd_device), NULL);
921 if (!obd_device_cachep)
922 GOTO(out, rc = -ENOMEM);
926 obd_cleanup_caches();
930 /* map connection to client */
931 struct obd_export *class_conn2export(struct lustre_handle *conn)
933 struct obd_export *export;
937 CDEBUG(D_CACHE, "looking for null handle\n");
941 if (conn->cookie == -1) { /* this means assign a new connection */
942 CDEBUG(D_CACHE, "want a new connection\n");
946 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
947 export = class_handle2object(conn->cookie, NULL);
950 EXPORT_SYMBOL(class_conn2export);
952 struct obd_device *class_exp2obd(struct obd_export *exp)
958 EXPORT_SYMBOL(class_exp2obd);
960 struct obd_import *class_exp2cliimp(struct obd_export *exp)
962 struct obd_device *obd = exp->exp_obd;
965 return obd->u.cli.cl_import;
967 EXPORT_SYMBOL(class_exp2cliimp);
969 /* Export management functions */
970 static void class_export_destroy(struct obd_export *exp)
972 struct obd_device *obd = exp->exp_obd;
975 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
976 LASSERT(obd != NULL);
978 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
979 exp->exp_client_uuid.uuid, obd->obd_name);
981 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
982 if (exp->exp_connection)
983 ptlrpc_put_connection_superhack(exp->exp_connection);
985 LASSERT(list_empty(&exp->exp_outstanding_replies));
986 LASSERT(list_empty(&exp->exp_uncommitted_replies));
987 LASSERT(list_empty(&exp->exp_req_replay_queue));
988 LASSERT(list_empty(&exp->exp_hp_rpcs));
989 obd_destroy_export(exp);
990 /* self export doesn't hold a reference to an obd, although it
991 * exists until freeing of the obd */
992 if (exp != obd->obd_self_export)
993 class_decref(obd, "export", exp);
995 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
999 static void export_handle_addref(void *export)
1001 class_export_get(export);
1004 static struct portals_handle_ops export_handle_ops = {
1005 .hop_addref = export_handle_addref,
1009 struct obd_export *class_export_get(struct obd_export *exp)
1011 atomic_inc(&exp->exp_refcount);
1012 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1013 atomic_read(&exp->exp_refcount));
1016 EXPORT_SYMBOL(class_export_get);
1018 void class_export_put(struct obd_export *exp)
1020 LASSERT(exp != NULL);
1021 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1022 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1023 atomic_read(&exp->exp_refcount) - 1);
1025 if (atomic_dec_and_test(&exp->exp_refcount)) {
1026 struct obd_device *obd = exp->exp_obd;
1028 CDEBUG(D_IOCTL, "final put %p/%s\n",
1029 exp, exp->exp_client_uuid.uuid);
1031 /* release nid stat refererence */
1032 lprocfs_exp_cleanup(exp);
1034 if (exp == obd->obd_self_export) {
1035 /* self export should be destroyed without
1036 * zombie thread as it doesn't hold a
1037 * reference to obd and doesn't hold any
1039 class_export_destroy(exp);
1040 /* self export is destroyed, no class
1041 * references exist and it is safe to free
1043 class_free_dev(obd);
1045 LASSERT(!list_empty(&exp->exp_obd_chain));
1046 obd_zombie_export_add(exp);
1051 EXPORT_SYMBOL(class_export_put);
1053 static void obd_zombie_exp_cull(struct work_struct *ws)
1055 struct obd_export *export;
1057 export = container_of(ws, struct obd_export, exp_zombie_work);
1058 class_export_destroy(export);
1061 /* Creates a new export, adds it to the hash table, and returns a
1062 * pointer to it. The refcount is 2: one for the hash reference, and
1063 * one for the pointer returned by this function. */
1064 struct obd_export *__class_new_export(struct obd_device *obd,
1065 struct obd_uuid *cluuid, bool is_self)
1067 struct obd_export *export;
1068 struct cfs_hash *hash = NULL;
1072 OBD_ALLOC_PTR(export);
1074 return ERR_PTR(-ENOMEM);
1076 export->exp_conn_cnt = 0;
1077 export->exp_lock_hash = NULL;
1078 export->exp_flock_hash = NULL;
1079 /* 2 = class_handle_hash + last */
1080 atomic_set(&export->exp_refcount, 2);
1081 atomic_set(&export->exp_rpc_count, 0);
1082 atomic_set(&export->exp_cb_count, 0);
1083 atomic_set(&export->exp_locks_count, 0);
1084 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1085 INIT_LIST_HEAD(&export->exp_locks_list);
1086 spin_lock_init(&export->exp_locks_list_guard);
1088 atomic_set(&export->exp_replay_count, 0);
1089 export->exp_obd = obd;
1090 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1091 spin_lock_init(&export->exp_uncommitted_replies_lock);
1092 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1093 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1094 INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1095 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1096 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1097 class_handle_hash(&export->exp_handle, &export_handle_ops);
1098 export->exp_last_request_time = ktime_get_real_seconds();
1099 spin_lock_init(&export->exp_lock);
1100 spin_lock_init(&export->exp_rpc_lock);
1101 INIT_HLIST_NODE(&export->exp_uuid_hash);
1102 INIT_HLIST_NODE(&export->exp_nid_hash);
1103 INIT_HLIST_NODE(&export->exp_gen_hash);
1104 spin_lock_init(&export->exp_bl_list_lock);
1105 INIT_LIST_HEAD(&export->exp_bl_list);
1106 INIT_LIST_HEAD(&export->exp_stale_list);
1107 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1109 export->exp_sp_peer = LUSTRE_SP_ANY;
1110 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1111 export->exp_client_uuid = *cluuid;
1112 obd_init_export(export);
1114 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1115 spin_lock(&obd->obd_dev_lock);
1116 /* shouldn't happen, but might race */
1117 if (obd->obd_stopping)
1118 GOTO(exit_unlock, rc = -ENODEV);
1120 hash = cfs_hash_getref(obd->obd_uuid_hash);
1122 GOTO(exit_unlock, rc = -ENODEV);
1123 spin_unlock(&obd->obd_dev_lock);
1125 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1127 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1128 obd->obd_name, cluuid->uuid, rc);
1129 GOTO(exit_err, rc = -EALREADY);
1133 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1134 spin_lock(&obd->obd_dev_lock);
1135 if (obd->obd_stopping) {
1137 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1138 GOTO(exit_unlock, rc = -ESHUTDOWN);
1142 class_incref(obd, "export", export);
1143 list_add_tail(&export->exp_obd_chain_timed,
1144 &obd->obd_exports_timed);
1145 list_add(&export->exp_obd_chain, &obd->obd_exports);
1146 obd->obd_num_exports++;
1148 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1149 INIT_LIST_HEAD(&export->exp_obd_chain);
1151 spin_unlock(&obd->obd_dev_lock);
1153 cfs_hash_putref(hash);
1157 spin_unlock(&obd->obd_dev_lock);
1160 cfs_hash_putref(hash);
1161 class_handle_unhash(&export->exp_handle);
1162 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1163 obd_destroy_export(export);
1164 OBD_FREE_PTR(export);
1168 struct obd_export *class_new_export(struct obd_device *obd,
1169 struct obd_uuid *uuid)
1171 return __class_new_export(obd, uuid, false);
1173 EXPORT_SYMBOL(class_new_export);
1175 struct obd_export *class_new_export_self(struct obd_device *obd,
1176 struct obd_uuid *uuid)
1178 return __class_new_export(obd, uuid, true);
1181 void class_unlink_export(struct obd_export *exp)
1183 class_handle_unhash(&exp->exp_handle);
1185 if (exp->exp_obd->obd_self_export == exp) {
1186 class_export_put(exp);
1190 spin_lock(&exp->exp_obd->obd_dev_lock);
1191 /* delete an uuid-export hashitem from hashtables */
1192 if (!hlist_unhashed(&exp->exp_uuid_hash))
1193 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1194 &exp->exp_client_uuid,
1195 &exp->exp_uuid_hash);
1197 #ifdef HAVE_SERVER_SUPPORT
1198 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1199 struct tg_export_data *ted = &exp->exp_target_data;
1200 struct cfs_hash *hash;
1202 /* Because obd_gen_hash will not be released until
1203 * class_cleanup(), so hash should never be NULL here */
1204 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1205 LASSERT(hash != NULL);
1206 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1207 &exp->exp_gen_hash);
1208 cfs_hash_putref(hash);
1210 #endif /* HAVE_SERVER_SUPPORT */
1212 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1213 list_del_init(&exp->exp_obd_chain_timed);
1214 exp->exp_obd->obd_num_exports--;
1215 spin_unlock(&exp->exp_obd->obd_dev_lock);
1216 atomic_inc(&obd_stale_export_num);
1218 /* A reference is kept by obd_stale_exports list */
1219 obd_stale_export_put(exp);
1221 EXPORT_SYMBOL(class_unlink_export);
1223 /* Import management functions */
1224 static void obd_zombie_import_free(struct obd_import *imp)
1228 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1229 imp->imp_obd->obd_name);
1231 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1233 ptlrpc_put_connection_superhack(imp->imp_connection);
1235 while (!list_empty(&imp->imp_conn_list)) {
1236 struct obd_import_conn *imp_conn;
1238 imp_conn = list_entry(imp->imp_conn_list.next,
1239 struct obd_import_conn, oic_item);
1240 list_del_init(&imp_conn->oic_item);
1241 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1242 OBD_FREE(imp_conn, sizeof(*imp_conn));
1245 LASSERT(imp->imp_sec == NULL);
1246 class_decref(imp->imp_obd, "import", imp);
1251 struct obd_import *class_import_get(struct obd_import *import)
1253 atomic_inc(&import->imp_refcount);
1254 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1255 atomic_read(&import->imp_refcount),
1256 import->imp_obd->obd_name);
1259 EXPORT_SYMBOL(class_import_get);
1261 void class_import_put(struct obd_import *imp)
1265 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1267 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1268 atomic_read(&imp->imp_refcount) - 1,
1269 imp->imp_obd->obd_name);
1271 if (atomic_dec_and_test(&imp->imp_refcount)) {
1272 CDEBUG(D_INFO, "final put import %p\n", imp);
1273 obd_zombie_import_add(imp);
1276 /* catch possible import put race */
1277 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1280 EXPORT_SYMBOL(class_import_put);
1282 static void init_imp_at(struct imp_at *at) {
1284 at_init(&at->iat_net_latency, 0, 0);
1285 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1286 /* max service estimates are tracked on the server side, so
1287 don't use the AT history here, just use the last reported
1288 val. (But keep hist for proc histogram, worst_ever) */
1289 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1294 static void obd_zombie_imp_cull(struct work_struct *ws)
1296 struct obd_import *import;
1298 import = container_of(ws, struct obd_import, imp_zombie_work);
1299 obd_zombie_import_free(import);
1302 struct obd_import *class_new_import(struct obd_device *obd)
1304 struct obd_import *imp;
1305 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1307 OBD_ALLOC(imp, sizeof(*imp));
1311 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1312 INIT_LIST_HEAD(&imp->imp_replay_list);
1313 INIT_LIST_HEAD(&imp->imp_sending_list);
1314 INIT_LIST_HEAD(&imp->imp_delayed_list);
1315 INIT_LIST_HEAD(&imp->imp_committed_list);
1316 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1317 imp->imp_known_replied_xid = 0;
1318 imp->imp_replay_cursor = &imp->imp_committed_list;
1319 spin_lock_init(&imp->imp_lock);
1320 imp->imp_last_success_conn = 0;
1321 imp->imp_state = LUSTRE_IMP_NEW;
1322 imp->imp_obd = class_incref(obd, "import", imp);
1323 rwlock_init(&imp->imp_sec_lock);
1324 init_waitqueue_head(&imp->imp_recovery_waitq);
1325 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1327 if (curr_pid_ns->child_reaper)
1328 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1330 imp->imp_sec_refpid = 1;
1332 atomic_set(&imp->imp_refcount, 2);
1333 atomic_set(&imp->imp_unregistering, 0);
1334 atomic_set(&imp->imp_inflight, 0);
1335 atomic_set(&imp->imp_replay_inflight, 0);
1336 atomic_set(&imp->imp_inval_count, 0);
1337 INIT_LIST_HEAD(&imp->imp_conn_list);
1338 init_imp_at(&imp->imp_at);
1340 /* the default magic is V2, will be used in connect RPC, and
1341 * then adjusted according to the flags in request/reply. */
1342 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1346 EXPORT_SYMBOL(class_new_import);
1348 void class_destroy_import(struct obd_import *import)
1350 LASSERT(import != NULL);
1351 LASSERT(import != LP_POISON);
1353 spin_lock(&import->imp_lock);
1354 import->imp_generation++;
1355 spin_unlock(&import->imp_lock);
1356 class_import_put(import);
1358 EXPORT_SYMBOL(class_destroy_import);
1360 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1362 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1364 spin_lock(&exp->exp_locks_list_guard);
1366 LASSERT(lock->l_exp_refs_nr >= 0);
1368 if (lock->l_exp_refs_target != NULL &&
1369 lock->l_exp_refs_target != exp) {
1370 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1371 exp, lock, lock->l_exp_refs_target);
1373 if ((lock->l_exp_refs_nr ++) == 0) {
1374 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1375 lock->l_exp_refs_target = exp;
1377 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1378 lock, exp, lock->l_exp_refs_nr);
1379 spin_unlock(&exp->exp_locks_list_guard);
1381 EXPORT_SYMBOL(__class_export_add_lock_ref);
1383 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1385 spin_lock(&exp->exp_locks_list_guard);
1386 LASSERT(lock->l_exp_refs_nr > 0);
1387 if (lock->l_exp_refs_target != exp) {
1388 LCONSOLE_WARN("lock %p, "
1389 "mismatching export pointers: %p, %p\n",
1390 lock, lock->l_exp_refs_target, exp);
1392 if (-- lock->l_exp_refs_nr == 0) {
1393 list_del_init(&lock->l_exp_refs_link);
1394 lock->l_exp_refs_target = NULL;
1396 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1397 lock, exp, lock->l_exp_refs_nr);
1398 spin_unlock(&exp->exp_locks_list_guard);
1400 EXPORT_SYMBOL(__class_export_del_lock_ref);
1403 /* A connection defines an export context in which preallocation can
1404 be managed. This releases the export pointer reference, and returns
1405 the export handle, so the export refcount is 1 when this function
1407 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1408 struct obd_uuid *cluuid)
1410 struct obd_export *export;
1411 LASSERT(conn != NULL);
1412 LASSERT(obd != NULL);
1413 LASSERT(cluuid != NULL);
1416 export = class_new_export(obd, cluuid);
1418 RETURN(PTR_ERR(export));
1420 conn->cookie = export->exp_handle.h_cookie;
1421 class_export_put(export);
1423 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1424 cluuid->uuid, conn->cookie);
1427 EXPORT_SYMBOL(class_connect);
1429 /* if export is involved in recovery then clean up related things */
1430 static void class_export_recovery_cleanup(struct obd_export *exp)
1432 struct obd_device *obd = exp->exp_obd;
1434 spin_lock(&obd->obd_recovery_task_lock);
1435 if (obd->obd_recovering) {
1436 if (exp->exp_in_recovery) {
1437 spin_lock(&exp->exp_lock);
1438 exp->exp_in_recovery = 0;
1439 spin_unlock(&exp->exp_lock);
1440 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1441 atomic_dec(&obd->obd_connected_clients);
1444 /* if called during recovery then should update
1445 * obd_stale_clients counter,
1446 * lightweight exports are not counted */
1447 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1448 exp->exp_obd->obd_stale_clients++;
1450 spin_unlock(&obd->obd_recovery_task_lock);
1452 spin_lock(&exp->exp_lock);
1453 /** Cleanup req replay fields */
1454 if (exp->exp_req_replay_needed) {
1455 exp->exp_req_replay_needed = 0;
1457 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1458 atomic_dec(&obd->obd_req_replay_clients);
1461 /** Cleanup lock replay data */
1462 if (exp->exp_lock_replay_needed) {
1463 exp->exp_lock_replay_needed = 0;
1465 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1466 atomic_dec(&obd->obd_lock_replay_clients);
1468 spin_unlock(&exp->exp_lock);
1471 /* This function removes 1-3 references from the export:
1472 * 1 - for export pointer passed
1473 * and if disconnect really need
1474 * 2 - removing from hash
1475 * 3 - in client_unlink_export
1476 * The export pointer passed to this function can destroyed */
1477 int class_disconnect(struct obd_export *export)
1479 int already_disconnected;
1482 if (export == NULL) {
1483 CWARN("attempting to free NULL export %p\n", export);
1487 spin_lock(&export->exp_lock);
1488 already_disconnected = export->exp_disconnected;
1489 export->exp_disconnected = 1;
1490 /* We hold references of export for uuid hash
1491 * and nid_hash and export link at least. So
1492 * it is safe to call cfs_hash_del in there. */
1493 if (!hlist_unhashed(&export->exp_nid_hash))
1494 cfs_hash_del(export->exp_obd->obd_nid_hash,
1495 &export->exp_connection->c_peer.nid,
1496 &export->exp_nid_hash);
1497 spin_unlock(&export->exp_lock);
1499 /* class_cleanup(), abort_recovery(), and class_fail_export()
1500 * all end up in here, and if any of them race we shouldn't
1501 * call extra class_export_puts(). */
1502 if (already_disconnected) {
1503 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1504 GOTO(no_disconn, already_disconnected);
1507 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1508 export->exp_handle.h_cookie);
1510 class_export_recovery_cleanup(export);
1511 class_unlink_export(export);
1513 class_export_put(export);
1516 EXPORT_SYMBOL(class_disconnect);
1518 /* Return non-zero for a fully connected export */
1519 int class_connected_export(struct obd_export *exp)
1524 spin_lock(&exp->exp_lock);
1525 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1526 spin_unlock(&exp->exp_lock);
1530 EXPORT_SYMBOL(class_connected_export);
1532 static void class_disconnect_export_list(struct list_head *list,
1533 enum obd_option flags)
1536 struct obd_export *exp;
1539 /* It's possible that an export may disconnect itself, but
1540 * nothing else will be added to this list. */
1541 while (!list_empty(list)) {
1542 exp = list_entry(list->next, struct obd_export,
1544 /* need for safe call CDEBUG after obd_disconnect */
1545 class_export_get(exp);
1547 spin_lock(&exp->exp_lock);
1548 exp->exp_flags = flags;
1549 spin_unlock(&exp->exp_lock);
1551 if (obd_uuid_equals(&exp->exp_client_uuid,
1552 &exp->exp_obd->obd_uuid)) {
1554 "exp %p export uuid == obd uuid, don't discon\n",
1556 /* Need to delete this now so we don't end up pointing
1557 * to work_list later when this export is cleaned up. */
1558 list_del_init(&exp->exp_obd_chain);
1559 class_export_put(exp);
1563 class_export_get(exp);
1564 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1565 "last request at %lld\n",
1566 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1567 exp, exp->exp_last_request_time);
1568 /* release one export reference anyway */
1569 rc = obd_disconnect(exp);
1571 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1572 obd_export_nid2str(exp), exp, rc);
1573 class_export_put(exp);
1578 void class_disconnect_exports(struct obd_device *obd)
1580 struct list_head work_list;
1583 /* Move all of the exports from obd_exports to a work list, en masse. */
1584 INIT_LIST_HEAD(&work_list);
1585 spin_lock(&obd->obd_dev_lock);
1586 list_splice_init(&obd->obd_exports, &work_list);
1587 list_splice_init(&obd->obd_delayed_exports, &work_list);
1588 spin_unlock(&obd->obd_dev_lock);
1590 if (!list_empty(&work_list)) {
1591 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1592 "disconnecting them\n", obd->obd_minor, obd);
1593 class_disconnect_export_list(&work_list,
1594 exp_flags_from_obd(obd));
1596 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1597 obd->obd_minor, obd);
1600 EXPORT_SYMBOL(class_disconnect_exports);
1602 /* Remove exports that have not completed recovery.
1604 void class_disconnect_stale_exports(struct obd_device *obd,
1605 int (*test_export)(struct obd_export *))
1607 struct list_head work_list;
1608 struct obd_export *exp, *n;
1612 INIT_LIST_HEAD(&work_list);
1613 spin_lock(&obd->obd_dev_lock);
1614 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1616 /* don't count self-export as client */
1617 if (obd_uuid_equals(&exp->exp_client_uuid,
1618 &exp->exp_obd->obd_uuid))
1621 /* don't evict clients which have no slot in last_rcvd
1622 * (e.g. lightweight connection) */
1623 if (exp->exp_target_data.ted_lr_idx == -1)
1626 spin_lock(&exp->exp_lock);
1627 if (exp->exp_failed || test_export(exp)) {
1628 spin_unlock(&exp->exp_lock);
1631 exp->exp_failed = 1;
1632 spin_unlock(&exp->exp_lock);
1634 list_move(&exp->exp_obd_chain, &work_list);
1636 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1637 obd->obd_name, exp->exp_client_uuid.uuid,
1638 obd_export_nid2str(exp));
1639 print_export_data(exp, "EVICTING", 0, D_HA);
1641 spin_unlock(&obd->obd_dev_lock);
1644 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1645 obd->obd_name, evicted);
1647 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1648 OBD_OPT_ABORT_RECOV);
1651 EXPORT_SYMBOL(class_disconnect_stale_exports);
1653 void class_fail_export(struct obd_export *exp)
1655 int rc, already_failed;
1657 spin_lock(&exp->exp_lock);
1658 already_failed = exp->exp_failed;
1659 exp->exp_failed = 1;
1660 spin_unlock(&exp->exp_lock);
1662 if (already_failed) {
1663 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1664 exp, exp->exp_client_uuid.uuid);
1668 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1669 exp, exp->exp_client_uuid.uuid);
1671 if (obd_dump_on_timeout)
1672 libcfs_debug_dumplog();
1674 /* need for safe call CDEBUG after obd_disconnect */
1675 class_export_get(exp);
1677 /* Most callers into obd_disconnect are removing their own reference
1678 * (request, for example) in addition to the one from the hash table.
1679 * We don't have such a reference here, so make one. */
1680 class_export_get(exp);
1681 rc = obd_disconnect(exp);
1683 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1685 CDEBUG(D_HA, "disconnected export %p/%s\n",
1686 exp, exp->exp_client_uuid.uuid);
1687 class_export_put(exp);
1689 EXPORT_SYMBOL(class_fail_export);
1691 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1693 struct cfs_hash *nid_hash;
1694 struct obd_export *doomed_exp = NULL;
1695 int exports_evicted = 0;
1697 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1699 spin_lock(&obd->obd_dev_lock);
1700 /* umount has run already, so evict thread should leave
1701 * its task to umount thread now */
1702 if (obd->obd_stopping) {
1703 spin_unlock(&obd->obd_dev_lock);
1704 return exports_evicted;
1706 nid_hash = obd->obd_nid_hash;
1707 cfs_hash_getref(nid_hash);
1708 spin_unlock(&obd->obd_dev_lock);
1711 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1712 if (doomed_exp == NULL)
1715 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1716 "nid %s found, wanted nid %s, requested nid %s\n",
1717 obd_export_nid2str(doomed_exp),
1718 libcfs_nid2str(nid_key), nid);
1719 LASSERTF(doomed_exp != obd->obd_self_export,
1720 "self-export is hashed by NID?\n");
1722 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1723 "request\n", obd->obd_name,
1724 obd_uuid2str(&doomed_exp->exp_client_uuid),
1725 obd_export_nid2str(doomed_exp));
1726 class_fail_export(doomed_exp);
1727 class_export_put(doomed_exp);
1730 cfs_hash_putref(nid_hash);
1732 if (!exports_evicted)
1733 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1734 obd->obd_name, nid);
1735 return exports_evicted;
1737 EXPORT_SYMBOL(obd_export_evict_by_nid);
1739 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1741 struct cfs_hash *uuid_hash;
1742 struct obd_export *doomed_exp = NULL;
1743 struct obd_uuid doomed_uuid;
1744 int exports_evicted = 0;
1746 spin_lock(&obd->obd_dev_lock);
1747 if (obd->obd_stopping) {
1748 spin_unlock(&obd->obd_dev_lock);
1749 return exports_evicted;
1751 uuid_hash = obd->obd_uuid_hash;
1752 cfs_hash_getref(uuid_hash);
1753 spin_unlock(&obd->obd_dev_lock);
1755 obd_str2uuid(&doomed_uuid, uuid);
1756 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1757 CERROR("%s: can't evict myself\n", obd->obd_name);
1758 cfs_hash_putref(uuid_hash);
1759 return exports_evicted;
1762 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1764 if (doomed_exp == NULL) {
1765 CERROR("%s: can't disconnect %s: no exports found\n",
1766 obd->obd_name, uuid);
1768 CWARN("%s: evicting %s at adminstrative request\n",
1769 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1770 class_fail_export(doomed_exp);
1771 class_export_put(doomed_exp);
1774 cfs_hash_putref(uuid_hash);
1776 return exports_evicted;
1779 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1780 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1781 EXPORT_SYMBOL(class_export_dump_hook);
1784 static void print_export_data(struct obd_export *exp, const char *status,
1785 int locks, int debug_level)
1787 struct ptlrpc_reply_state *rs;
1788 struct ptlrpc_reply_state *first_reply = NULL;
1791 spin_lock(&exp->exp_lock);
1792 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1798 spin_unlock(&exp->exp_lock);
1800 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1801 "%p %s %llu stale:%d\n",
1802 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1803 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1804 atomic_read(&exp->exp_rpc_count),
1805 atomic_read(&exp->exp_cb_count),
1806 atomic_read(&exp->exp_locks_count),
1807 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1808 nreplies, first_reply, nreplies > 3 ? "..." : "",
1809 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1810 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1811 if (locks && class_export_dump_hook != NULL)
1812 class_export_dump_hook(exp);
1816 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1818 struct obd_export *exp;
1820 spin_lock(&obd->obd_dev_lock);
1821 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1822 print_export_data(exp, "ACTIVE", locks, debug_level);
1823 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1824 print_export_data(exp, "UNLINKED", locks, debug_level);
1825 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1826 print_export_data(exp, "DELAYED", locks, debug_level);
1827 spin_unlock(&obd->obd_dev_lock);
1830 void obd_exports_barrier(struct obd_device *obd)
1833 LASSERT(list_empty(&obd->obd_exports));
1834 spin_lock(&obd->obd_dev_lock);
1835 while (!list_empty(&obd->obd_unlinked_exports)) {
1836 spin_unlock(&obd->obd_dev_lock);
1837 set_current_state(TASK_UNINTERRUPTIBLE);
1838 schedule_timeout(cfs_time_seconds(waited));
1839 if (waited > 5 && is_power_of_2(waited)) {
1840 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1841 "more than %d seconds. "
1842 "The obd refcount = %d. Is it stuck?\n",
1843 obd->obd_name, waited,
1844 atomic_read(&obd->obd_refcount));
1845 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1848 spin_lock(&obd->obd_dev_lock);
1850 spin_unlock(&obd->obd_dev_lock);
1852 EXPORT_SYMBOL(obd_exports_barrier);
1855 * Add export to the obd_zombe thread and notify it.
1857 static void obd_zombie_export_add(struct obd_export *exp) {
1858 atomic_dec(&obd_stale_export_num);
1859 spin_lock(&exp->exp_obd->obd_dev_lock);
1860 LASSERT(!list_empty(&exp->exp_obd_chain));
1861 list_del_init(&exp->exp_obd_chain);
1862 spin_unlock(&exp->exp_obd->obd_dev_lock);
1864 queue_work(zombie_wq, &exp->exp_zombie_work);
1868 * Add import to the obd_zombe thread and notify it.
1870 static void obd_zombie_import_add(struct obd_import *imp) {
1871 LASSERT(imp->imp_sec == NULL);
1873 queue_work(zombie_wq, &imp->imp_zombie_work);
1877 * wait when obd_zombie import/export queues become empty
1879 void obd_zombie_barrier(void)
1881 flush_workqueue(zombie_wq);
1883 EXPORT_SYMBOL(obd_zombie_barrier);
1886 struct obd_export *obd_stale_export_get(void)
1888 struct obd_export *exp = NULL;
1891 spin_lock(&obd_stale_export_lock);
1892 if (!list_empty(&obd_stale_exports)) {
1893 exp = list_entry(obd_stale_exports.next,
1894 struct obd_export, exp_stale_list);
1895 list_del_init(&exp->exp_stale_list);
1897 spin_unlock(&obd_stale_export_lock);
1900 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1901 atomic_read(&obd_stale_export_num));
1905 EXPORT_SYMBOL(obd_stale_export_get);
1907 void obd_stale_export_put(struct obd_export *exp)
1911 LASSERT(list_empty(&exp->exp_stale_list));
1912 if (exp->exp_lock_hash &&
1913 atomic_read(&exp->exp_lock_hash->hs_count)) {
1914 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1915 atomic_read(&obd_stale_export_num));
1917 spin_lock_bh(&exp->exp_bl_list_lock);
1918 spin_lock(&obd_stale_export_lock);
1919 /* Add to the tail if there is no blocked locks,
1920 * to the head otherwise. */
1921 if (list_empty(&exp->exp_bl_list))
1922 list_add_tail(&exp->exp_stale_list,
1923 &obd_stale_exports);
1925 list_add(&exp->exp_stale_list,
1926 &obd_stale_exports);
1928 spin_unlock(&obd_stale_export_lock);
1929 spin_unlock_bh(&exp->exp_bl_list_lock);
1931 class_export_put(exp);
1935 EXPORT_SYMBOL(obd_stale_export_put);
1938 * Adjust the position of the export in the stale list,
1939 * i.e. move to the head of the list if is needed.
1941 void obd_stale_export_adjust(struct obd_export *exp)
1943 LASSERT(exp != NULL);
1944 spin_lock_bh(&exp->exp_bl_list_lock);
1945 spin_lock(&obd_stale_export_lock);
1947 if (!list_empty(&exp->exp_stale_list) &&
1948 !list_empty(&exp->exp_bl_list))
1949 list_move(&exp->exp_stale_list, &obd_stale_exports);
1951 spin_unlock(&obd_stale_export_lock);
1952 spin_unlock_bh(&exp->exp_bl_list_lock);
1954 EXPORT_SYMBOL(obd_stale_export_adjust);
1957 * start destroy zombie import/export thread
1959 int obd_zombie_impexp_init(void)
1961 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1969 * stop destroy zombie import/export thread
1971 void obd_zombie_impexp_stop(void)
1973 destroy_workqueue(zombie_wq);
1974 LASSERT(list_empty(&obd_stale_exports));
1977 /***** Kernel-userspace comm helpers *******/
1979 /* Get length of entire message, including header */
1980 int kuc_len(int payload_len)
1982 return sizeof(struct kuc_hdr) + payload_len;
1984 EXPORT_SYMBOL(kuc_len);
1986 /* Get a pointer to kuc header, given a ptr to the payload
1987 * @param p Pointer to payload area
1988 * @returns Pointer to kuc header
1990 struct kuc_hdr * kuc_ptr(void *p)
1992 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1993 LASSERT(lh->kuc_magic == KUC_MAGIC);
1996 EXPORT_SYMBOL(kuc_ptr);
1998 /* Alloc space for a message, and fill in header
1999 * @return Pointer to payload area
2001 void *kuc_alloc(int payload_len, int transport, int type)
2004 int len = kuc_len(payload_len);
2008 return ERR_PTR(-ENOMEM);
2010 lh->kuc_magic = KUC_MAGIC;
2011 lh->kuc_transport = transport;
2012 lh->kuc_msgtype = type;
2013 lh->kuc_msglen = len;
2015 return (void *)(lh + 1);
2017 EXPORT_SYMBOL(kuc_alloc);
2019 /* Takes pointer to payload area */
2020 void kuc_free(void *p, int payload_len)
2022 struct kuc_hdr *lh = kuc_ptr(p);
2023 OBD_FREE(lh, kuc_len(payload_len));
2025 EXPORT_SYMBOL(kuc_free);
2027 struct obd_request_slot_waiter {
2028 struct list_head orsw_entry;
2029 wait_queue_head_t orsw_waitq;
2033 static bool obd_request_slot_avail(struct client_obd *cli,
2034 struct obd_request_slot_waiter *orsw)
2038 spin_lock(&cli->cl_loi_list_lock);
2039 avail = !!list_empty(&orsw->orsw_entry);
2040 spin_unlock(&cli->cl_loi_list_lock);
2046 * For network flow control, the RPC sponsor needs to acquire a credit
2047 * before sending the RPC. The credits count for a connection is defined
2048 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2049 * the subsequent RPC sponsors need to wait until others released their
2050 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2052 int obd_get_request_slot(struct client_obd *cli)
2054 struct obd_request_slot_waiter orsw;
2055 struct l_wait_info lwi;
2058 spin_lock(&cli->cl_loi_list_lock);
2059 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2060 cli->cl_rpcs_in_flight++;
2061 spin_unlock(&cli->cl_loi_list_lock);
2065 init_waitqueue_head(&orsw.orsw_waitq);
2066 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2067 orsw.orsw_signaled = false;
2068 spin_unlock(&cli->cl_loi_list_lock);
2070 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2071 rc = l_wait_event(orsw.orsw_waitq,
2072 obd_request_slot_avail(cli, &orsw) ||
2076 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2077 * freed but other (such as obd_put_request_slot) is using it. */
2078 spin_lock(&cli->cl_loi_list_lock);
2080 if (!orsw.orsw_signaled) {
2081 if (list_empty(&orsw.orsw_entry))
2082 cli->cl_rpcs_in_flight--;
2084 list_del(&orsw.orsw_entry);
2088 if (orsw.orsw_signaled) {
2089 LASSERT(list_empty(&orsw.orsw_entry));
2093 spin_unlock(&cli->cl_loi_list_lock);
2097 EXPORT_SYMBOL(obd_get_request_slot);
2099 void obd_put_request_slot(struct client_obd *cli)
2101 struct obd_request_slot_waiter *orsw;
2103 spin_lock(&cli->cl_loi_list_lock);
2104 cli->cl_rpcs_in_flight--;
2106 /* If there is free slot, wakeup the first waiter. */
2107 if (!list_empty(&cli->cl_flight_waiters) &&
2108 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2109 orsw = list_entry(cli->cl_flight_waiters.next,
2110 struct obd_request_slot_waiter, orsw_entry);
2111 list_del_init(&orsw->orsw_entry);
2112 cli->cl_rpcs_in_flight++;
2113 wake_up(&orsw->orsw_waitq);
2115 spin_unlock(&cli->cl_loi_list_lock);
2117 EXPORT_SYMBOL(obd_put_request_slot);
2119 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2121 return cli->cl_max_rpcs_in_flight;
2123 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2125 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2127 struct obd_request_slot_waiter *orsw;
2131 const char *type_name;
2134 if (max > OBD_MAX_RIF_MAX || max < 1)
2137 type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2138 if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2139 /* adjust max_mod_rpcs_in_flight to ensure it is always
2140 * strictly lower that max_rpcs_in_flight */
2142 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2143 "because it must be higher than "
2144 "max_mod_rpcs_in_flight value",
2145 cli->cl_import->imp_obd->obd_name);
2148 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2149 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2155 spin_lock(&cli->cl_loi_list_lock);
2156 old = cli->cl_max_rpcs_in_flight;
2157 cli->cl_max_rpcs_in_flight = max;
2158 client_adjust_max_dirty(cli);
2162 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2163 for (i = 0; i < diff; i++) {
2164 if (list_empty(&cli->cl_flight_waiters))
2167 orsw = list_entry(cli->cl_flight_waiters.next,
2168 struct obd_request_slot_waiter, orsw_entry);
2169 list_del_init(&orsw->orsw_entry);
2170 cli->cl_rpcs_in_flight++;
2171 wake_up(&orsw->orsw_waitq);
2173 spin_unlock(&cli->cl_loi_list_lock);
2177 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2179 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2181 return cli->cl_max_mod_rpcs_in_flight;
2183 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2185 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2187 struct obd_connect_data *ocd;
2191 if (max > OBD_MAX_RIF_MAX || max < 1)
2194 /* cannot exceed or equal max_rpcs_in_flight */
2195 if (max >= cli->cl_max_rpcs_in_flight) {
2196 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2197 "higher or equal to max_rpcs_in_flight value (%u)\n",
2198 cli->cl_import->imp_obd->obd_name,
2199 max, cli->cl_max_rpcs_in_flight);
2203 /* cannot exceed max modify RPCs in flight supported by the server */
2204 ocd = &cli->cl_import->imp_connect_data;
2205 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2206 maxmodrpcs = ocd->ocd_maxmodrpcs;
2209 if (max > maxmodrpcs) {
2210 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2211 "higher than max_mod_rpcs_per_client value (%hu) "
2212 "returned by the server at connection\n",
2213 cli->cl_import->imp_obd->obd_name,
2218 spin_lock(&cli->cl_mod_rpcs_lock);
2220 prev = cli->cl_max_mod_rpcs_in_flight;
2221 cli->cl_max_mod_rpcs_in_flight = max;
2223 /* wakeup waiters if limit has been increased */
2224 if (cli->cl_max_mod_rpcs_in_flight > prev)
2225 wake_up(&cli->cl_mod_rpcs_waitq);
2227 spin_unlock(&cli->cl_mod_rpcs_lock);
2231 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2233 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2234 struct seq_file *seq)
2236 unsigned long mod_tot = 0, mod_cum;
2237 struct timespec64 now;
2240 ktime_get_real_ts64(&now);
2242 spin_lock(&cli->cl_mod_rpcs_lock);
2244 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2245 (s64)now.tv_sec, now.tv_nsec);
2246 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2247 cli->cl_mod_rpcs_in_flight);
2249 seq_printf(seq, "\n\t\t\tmodify\n");
2250 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2252 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2255 for (i = 0; i < OBD_HIST_MAX; i++) {
2256 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2258 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2259 i, mod, pct(mod, mod_tot),
2260 pct(mod_cum, mod_tot));
2261 if (mod_cum == mod_tot)
2265 spin_unlock(&cli->cl_mod_rpcs_lock);
2269 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2271 /* The number of modify RPCs sent in parallel is limited
2272 * because the server has a finite number of slots per client to
2273 * store request result and ensure reply reconstruction when needed.
2274 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2275 * that takes into account server limit and cl_max_rpcs_in_flight
2277 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2278 * one close request is allowed above the maximum.
2280 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2285 /* A slot is available if
2286 * - number of modify RPCs in flight is less than the max
2287 * - it's a close RPC and no other close request is in flight
2289 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2290 (close_req && cli->cl_close_rpcs_in_flight == 0);
2295 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2300 spin_lock(&cli->cl_mod_rpcs_lock);
2301 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2302 spin_unlock(&cli->cl_mod_rpcs_lock);
2306 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2309 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2310 it->it_op == IT_READDIR ||
2311 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2316 /* Get a modify RPC slot from the obd client @cli according
2317 * to the kind of operation @opc that is going to be sent
2318 * and the intent @it of the operation if it applies.
2319 * If the maximum number of modify RPCs in flight is reached
2320 * the thread is put to sleep.
2321 * Returns the tag to be set in the request message. Tag 0
2322 * is reserved for non-modifying requests.
2324 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2325 struct lookup_intent *it)
2327 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2328 bool close_req = false;
2331 /* read-only metadata RPCs don't consume a slot on MDT
2332 * for reply reconstruction
2334 if (obd_skip_mod_rpc_slot(it))
2337 if (opc == MDS_CLOSE)
2341 spin_lock(&cli->cl_mod_rpcs_lock);
2342 max = cli->cl_max_mod_rpcs_in_flight;
2343 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2344 /* there is a slot available */
2345 cli->cl_mod_rpcs_in_flight++;
2347 cli->cl_close_rpcs_in_flight++;
2348 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2349 cli->cl_mod_rpcs_in_flight);
2350 /* find a free tag */
2351 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2353 LASSERT(i < OBD_MAX_RIF_MAX);
2354 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2355 spin_unlock(&cli->cl_mod_rpcs_lock);
2356 /* tag 0 is reserved for non-modify RPCs */
2359 spin_unlock(&cli->cl_mod_rpcs_lock);
2361 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2362 "opc %u, max %hu\n",
2363 cli->cl_import->imp_obd->obd_name, opc, max);
2365 l_wait_event_exclusive(cli->cl_mod_rpcs_waitq,
2366 obd_mod_rpc_slot_avail(cli, close_req),
2370 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2372 /* Put a modify RPC slot from the obd client @cli according
2373 * to the kind of operation @opc that has been sent and the
2374 * intent @it of the operation if it applies.
2376 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2377 struct lookup_intent *it, __u16 tag)
2379 bool close_req = false;
2381 if (obd_skip_mod_rpc_slot(it))
2384 if (opc == MDS_CLOSE)
2387 spin_lock(&cli->cl_mod_rpcs_lock);
2388 cli->cl_mod_rpcs_in_flight--;
2390 cli->cl_close_rpcs_in_flight--;
2391 /* release the tag in the bitmap */
2392 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2393 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2394 spin_unlock(&cli->cl_mod_rpcs_lock);
2395 wake_up(&cli->cl_mod_rpcs_waitq);
2397 EXPORT_SYMBOL(obd_put_mod_rpc_slot);