4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
54 static struct kmem_cache *obd_device_cachep;
56 static struct workqueue_struct *zombie_wq;
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61 const char *status, int locks, int debug_level);
63 static LIST_HEAD(obd_stale_exports);
64 static DEFINE_SPINLOCK(obd_stale_export_lock);
65 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
101 struct list_head *tmp;
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 list_for_each(tmp, &obd_types) {
106 type = list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
152 void class_put_type(struct obd_type *type)
155 spin_lock(&type->obd_type_lock);
157 module_put(type->typ_dt_ops->o_owner);
158 spin_unlock(&type->obd_type_lock);
161 static void class_sysfs_release(struct kobject *kobj)
163 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
165 #ifdef HAVE_SERVER_SUPPORT
166 if (type->typ_sym_filter)
167 type->typ_debugfs_entry = NULL;
169 debugfs_remove_recursive(type->typ_debugfs_entry);
170 type->typ_debugfs_entry = NULL;
173 lu_device_type_fini(type->typ_lu);
175 spin_lock(&obd_types_lock);
176 list_del(&type->typ_chain);
177 spin_unlock(&obd_types_lock);
179 #ifdef CONFIG_PROC_FS
180 if (type->typ_name && type->typ_procroot)
181 remove_proc_subtree(type->typ_name, proc_lustre_root);
183 if (type->typ_md_ops)
184 OBD_FREE_PTR(type->typ_md_ops);
185 if (type->typ_dt_ops)
186 OBD_FREE_PTR(type->typ_dt_ops);
188 OBD_FREE(type, sizeof(*type));
191 static struct kobj_type class_ktype = {
192 .sysfs_ops = &lustre_sysfs_ops,
193 .release = class_sysfs_release,
196 #ifdef HAVE_SERVER_SUPPORT
197 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
199 struct dentry *symlink;
200 struct obd_type *type;
201 struct kobject *kobj;
204 kobj = kset_find_obj(lustre_kset, name);
207 return ERR_PTR(-EEXIST);
210 OBD_ALLOC(type, sizeof(*type));
212 return ERR_PTR(-ENOMEM);
214 INIT_LIST_HEAD(&type->typ_chain);
216 type->typ_kobj.kset = lustre_kset;
217 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
218 &lustre_kset->kobj, "%s", name);
222 symlink = debugfs_create_dir(name, debugfs_lustre_root);
223 if (IS_ERR_OR_NULL(symlink)) {
224 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
225 kobject_put(&type->typ_kobj);
228 type->typ_debugfs_entry = symlink;
229 type->typ_sym_filter = true;
232 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
234 if (IS_ERR(type->typ_procroot)) {
235 CERROR("%s: can't create compat proc entry: %d\n",
236 name, (int)PTR_ERR(type->typ_procroot));
237 type->typ_procroot = NULL;
243 EXPORT_SYMBOL(class_add_symlinks);
244 #endif /* HAVE_SERVER_SUPPORT */
246 #define CLASS_MAX_NAME 1024
248 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
249 bool enable_proc, struct lprocfs_vars *vars,
250 const char *name, struct lu_device_type *ldt)
252 struct obd_type *type;
257 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
259 if (class_search_type(name)) {
260 #ifdef HAVE_SERVER_SUPPORT
261 if (strcmp(name, LUSTRE_LOV_NAME) == 0 ||
262 strcmp(name, LUSTRE_OSC_NAME) == 0) {
263 struct kobject *kobj;
265 kobj = kset_find_obj(lustre_kset, name);
267 type = container_of(kobj, struct obd_type,
272 #endif /* HAVE_SERVER_SUPPORT */
273 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
277 OBD_ALLOC(type, sizeof(*type));
281 INIT_LIST_HEAD(&type->typ_chain);
282 type->typ_kobj.kset = lustre_kset;
283 kobject_init(&type->typ_kobj, &class_ktype);
284 #ifdef HAVE_SERVER_SUPPORT
286 #endif /* HAVE_SERVER_SUPPORT */
287 OBD_ALLOC_PTR(type->typ_dt_ops);
288 OBD_ALLOC_PTR(type->typ_md_ops);
290 if (type->typ_dt_ops == NULL ||
291 type->typ_md_ops == NULL)
292 GOTO (failed, rc = -ENOMEM);
294 *(type->typ_dt_ops) = *dt_ops;
295 /* md_ops is optional */
297 *(type->typ_md_ops) = *md_ops;
298 spin_lock_init(&type->obd_type_lock);
300 #ifdef HAVE_SERVER_SUPPORT
301 if (type->typ_sym_filter)
304 #ifdef CONFIG_PROC_FS
305 if (enable_proc && !type->typ_procroot) {
306 type->typ_procroot = lprocfs_register(name,
309 if (IS_ERR(type->typ_procroot)) {
310 rc = PTR_ERR(type->typ_procroot);
311 type->typ_procroot = NULL;
316 type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
318 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
319 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
321 type->typ_debugfs_entry = NULL;
325 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
328 #ifdef HAVE_SERVER_SUPPORT
333 rc = lu_device_type_init(ldt);
338 spin_lock(&obd_types_lock);
339 list_add(&type->typ_chain, &obd_types);
340 spin_unlock(&obd_types_lock);
345 kobject_put(&type->typ_kobj);
349 EXPORT_SYMBOL(class_register_type);
351 int class_unregister_type(const char *name)
353 struct obd_type *type = class_search_type(name);
357 CERROR("unknown obd type\n");
361 if (type->typ_refcnt) {
362 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
363 /* This is a bad situation, let's make the best of it */
364 /* Remove ops, but leave the name for debugging */
365 OBD_FREE_PTR(type->typ_dt_ops);
366 OBD_FREE_PTR(type->typ_md_ops);
370 kobject_put(&type->typ_kobj);
373 } /* class_unregister_type */
374 EXPORT_SYMBOL(class_unregister_type);
377 * Create a new obd device.
379 * Allocate the new obd_device and initialize it.
381 * \param[in] type_name obd device type string.
382 * \param[in] name obd device name.
383 * \param[in] uuid obd device UUID
385 * \retval newdev pointer to created obd_device
386 * \retval ERR_PTR(errno) on error
388 struct obd_device *class_newdev(const char *type_name, const char *name,
391 struct obd_device *newdev;
392 struct obd_type *type = NULL;
395 if (strlen(name) >= MAX_OBD_NAME) {
396 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
397 RETURN(ERR_PTR(-EINVAL));
400 type = class_get_type(type_name);
402 CERROR("OBD: unknown type: %s\n", type_name);
403 RETURN(ERR_PTR(-ENODEV));
406 newdev = obd_device_alloc();
407 if (newdev == NULL) {
408 class_put_type(type);
409 RETURN(ERR_PTR(-ENOMEM));
411 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
412 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
413 newdev->obd_type = type;
414 newdev->obd_minor = -1;
416 rwlock_init(&newdev->obd_pool_lock);
417 newdev->obd_pool_limit = 0;
418 newdev->obd_pool_slv = 0;
420 INIT_LIST_HEAD(&newdev->obd_exports);
421 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
422 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
423 INIT_LIST_HEAD(&newdev->obd_exports_timed);
424 INIT_LIST_HEAD(&newdev->obd_nid_stats);
425 spin_lock_init(&newdev->obd_nid_lock);
426 spin_lock_init(&newdev->obd_dev_lock);
427 mutex_init(&newdev->obd_dev_mutex);
428 spin_lock_init(&newdev->obd_osfs_lock);
429 /* newdev->obd_osfs_age must be set to a value in the distant
430 * past to guarantee a fresh statfs is fetched on mount. */
431 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
433 /* XXX belongs in setup not attach */
434 init_rwsem(&newdev->obd_observer_link_sem);
436 spin_lock_init(&newdev->obd_recovery_task_lock);
437 init_waitqueue_head(&newdev->obd_next_transno_waitq);
438 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
439 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
440 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
441 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
442 INIT_LIST_HEAD(&newdev->obd_evict_list);
443 INIT_LIST_HEAD(&newdev->obd_lwp_list);
445 llog_group_init(&newdev->obd_olg);
446 /* Detach drops this */
447 atomic_set(&newdev->obd_refcount, 1);
448 lu_ref_init(&newdev->obd_reference);
449 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
451 newdev->obd_conn_inprogress = 0;
453 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
455 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
456 newdev->obd_name, newdev);
464 * \param[in] obd obd_device to be freed
468 void class_free_dev(struct obd_device *obd)
470 struct obd_type *obd_type = obd->obd_type;
472 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
473 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
474 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
475 "obd %p != obd_devs[%d] %p\n",
476 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
477 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
478 "obd_refcount should be 0, not %d\n",
479 atomic_read(&obd->obd_refcount));
480 LASSERT(obd_type != NULL);
482 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
483 obd->obd_name, obd->obd_type->typ_name);
485 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
486 obd->obd_name, obd->obd_uuid.uuid);
487 if (obd->obd_stopping) {
490 /* If we're not stopping, we were never set up */
491 err = obd_cleanup(obd);
493 CERROR("Cleanup %s returned %d\n",
497 obd_device_free(obd);
499 class_put_type(obd_type);
503 * Unregister obd device.
505 * Free slot in obd_dev[] used by \a obd.
507 * \param[in] new_obd obd_device to be unregistered
511 void class_unregister_device(struct obd_device *obd)
513 write_lock(&obd_dev_lock);
514 if (obd->obd_minor >= 0) {
515 LASSERT(obd_devs[obd->obd_minor] == obd);
516 obd_devs[obd->obd_minor] = NULL;
519 write_unlock(&obd_dev_lock);
523 * Register obd device.
525 * Find free slot in obd_devs[], fills it with \a new_obd.
527 * \param[in] new_obd obd_device to be registered
530 * \retval -EEXIST device with this name is registered
531 * \retval -EOVERFLOW obd_devs[] is full
533 int class_register_device(struct obd_device *new_obd)
537 int new_obd_minor = 0;
538 bool minor_assign = false;
539 bool retried = false;
542 write_lock(&obd_dev_lock);
543 for (i = 0; i < class_devno_max(); i++) {
544 struct obd_device *obd = class_num2obd(i);
547 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
550 write_unlock(&obd_dev_lock);
552 /* the obd_device could be waited to be
553 * destroyed by the "obd_zombie_impexp_thread".
555 obd_zombie_barrier();
560 CERROR("%s: already exists, won't add\n",
562 /* in case we found a free slot before duplicate */
563 minor_assign = false;
567 if (!minor_assign && obd == NULL) {
574 new_obd->obd_minor = new_obd_minor;
575 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
576 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
577 obd_devs[new_obd_minor] = new_obd;
581 CERROR("%s: all %u/%u devices used, increase "
582 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
583 i, class_devno_max(), ret);
586 write_unlock(&obd_dev_lock);
591 static int class_name2dev_nolock(const char *name)
598 for (i = 0; i < class_devno_max(); i++) {
599 struct obd_device *obd = class_num2obd(i);
601 if (obd && strcmp(name, obd->obd_name) == 0) {
602 /* Make sure we finished attaching before we give
603 out any references */
604 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
605 if (obd->obd_attached) {
615 int class_name2dev(const char *name)
622 read_lock(&obd_dev_lock);
623 i = class_name2dev_nolock(name);
624 read_unlock(&obd_dev_lock);
628 EXPORT_SYMBOL(class_name2dev);
630 struct obd_device *class_name2obd(const char *name)
632 int dev = class_name2dev(name);
634 if (dev < 0 || dev > class_devno_max())
636 return class_num2obd(dev);
638 EXPORT_SYMBOL(class_name2obd);
640 int class_uuid2dev_nolock(struct obd_uuid *uuid)
644 for (i = 0; i < class_devno_max(); i++) {
645 struct obd_device *obd = class_num2obd(i);
647 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
648 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
656 int class_uuid2dev(struct obd_uuid *uuid)
660 read_lock(&obd_dev_lock);
661 i = class_uuid2dev_nolock(uuid);
662 read_unlock(&obd_dev_lock);
666 EXPORT_SYMBOL(class_uuid2dev);
668 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
670 int dev = class_uuid2dev(uuid);
673 return class_num2obd(dev);
675 EXPORT_SYMBOL(class_uuid2obd);
678 * Get obd device from ::obd_devs[]
680 * \param num [in] array index
682 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
683 * otherwise return the obd device there.
685 struct obd_device *class_num2obd(int num)
687 struct obd_device *obd = NULL;
689 if (num < class_devno_max()) {
694 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
695 "%p obd_magic %08x != %08x\n",
696 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
697 LASSERTF(obd->obd_minor == num,
698 "%p obd_minor %0d != %0d\n",
699 obd, obd->obd_minor, num);
706 * Find obd in obd_dev[] by name or uuid.
708 * Increment obd's refcount if found.
710 * \param[in] str obd name or uuid
712 * \retval NULL if not found
713 * \retval target pointer to found obd_device
715 struct obd_device *class_dev_by_str(const char *str)
717 struct obd_device *target = NULL;
718 struct obd_uuid tgtuuid;
721 obd_str2uuid(&tgtuuid, str);
723 read_lock(&obd_dev_lock);
724 rc = class_uuid2dev_nolock(&tgtuuid);
726 rc = class_name2dev_nolock(str);
729 target = class_num2obd(rc);
732 class_incref(target, "find", current);
733 read_unlock(&obd_dev_lock);
737 EXPORT_SYMBOL(class_dev_by_str);
740 * Get obd devices count. Device in any
742 * \retval obd device count
744 int get_devices_count(void)
746 int index, max_index = class_devno_max(), dev_count = 0;
748 read_lock(&obd_dev_lock);
749 for (index = 0; index <= max_index; index++) {
750 struct obd_device *obd = class_num2obd(index);
754 read_unlock(&obd_dev_lock);
758 EXPORT_SYMBOL(get_devices_count);
760 void class_obd_list(void)
765 read_lock(&obd_dev_lock);
766 for (i = 0; i < class_devno_max(); i++) {
767 struct obd_device *obd = class_num2obd(i);
771 if (obd->obd_stopping)
773 else if (obd->obd_set_up)
775 else if (obd->obd_attached)
779 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
780 i, status, obd->obd_type->typ_name,
781 obd->obd_name, obd->obd_uuid.uuid,
782 atomic_read(&obd->obd_refcount));
784 read_unlock(&obd_dev_lock);
788 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
789 specified, then only the client with that uuid is returned,
790 otherwise any client connected to the tgt is returned. */
791 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
792 const char *type_name,
793 struct obd_uuid *grp_uuid)
797 read_lock(&obd_dev_lock);
798 for (i = 0; i < class_devno_max(); i++) {
799 struct obd_device *obd = class_num2obd(i);
803 if ((strncmp(obd->obd_type->typ_name, type_name,
804 strlen(type_name)) == 0)) {
805 if (obd_uuid_equals(tgt_uuid,
806 &obd->u.cli.cl_target_uuid) &&
807 ((grp_uuid)? obd_uuid_equals(grp_uuid,
808 &obd->obd_uuid) : 1)) {
809 read_unlock(&obd_dev_lock);
814 read_unlock(&obd_dev_lock);
818 EXPORT_SYMBOL(class_find_client_obd);
820 /* Iterate the obd_device list looking devices have grp_uuid. Start
821 searching at *next, and if a device is found, the next index to look
822 at is saved in *next. If next is NULL, then the first matching device
823 will always be returned. */
824 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
830 else if (*next >= 0 && *next < class_devno_max())
835 read_lock(&obd_dev_lock);
836 for (; i < class_devno_max(); i++) {
837 struct obd_device *obd = class_num2obd(i);
841 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
844 read_unlock(&obd_dev_lock);
848 read_unlock(&obd_dev_lock);
852 EXPORT_SYMBOL(class_devices_in_group);
855 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
856 * adjust sptlrpc settings accordingly.
858 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
860 struct obd_device *obd;
864 LASSERT(namelen > 0);
866 read_lock(&obd_dev_lock);
867 for (i = 0; i < class_devno_max(); i++) {
868 obd = class_num2obd(i);
870 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
873 /* only notify mdc, osc, osp, lwp, mdt, ost
874 * because only these have a -sptlrpc llog */
875 type = obd->obd_type->typ_name;
876 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
877 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
878 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
879 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
880 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
881 strcmp(type, LUSTRE_OST_NAME) != 0)
884 if (strncmp(obd->obd_name, fsname, namelen))
887 class_incref(obd, __FUNCTION__, obd);
888 read_unlock(&obd_dev_lock);
889 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
890 sizeof(KEY_SPTLRPC_CONF),
891 KEY_SPTLRPC_CONF, 0, NULL, NULL);
893 class_decref(obd, __FUNCTION__, obd);
894 read_lock(&obd_dev_lock);
896 read_unlock(&obd_dev_lock);
899 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
901 void obd_cleanup_caches(void)
904 if (obd_device_cachep) {
905 kmem_cache_destroy(obd_device_cachep);
906 obd_device_cachep = NULL;
912 int obd_init_caches(void)
917 LASSERT(obd_device_cachep == NULL);
918 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
919 sizeof(struct obd_device),
920 0, 0, 0, sizeof(struct obd_device), NULL);
921 if (!obd_device_cachep)
922 GOTO(out, rc = -ENOMEM);
926 obd_cleanup_caches();
930 static struct portals_handle_ops export_handle_ops;
932 /* map connection to client */
933 struct obd_export *class_conn2export(struct lustre_handle *conn)
935 struct obd_export *export;
939 CDEBUG(D_CACHE, "looking for null handle\n");
943 if (conn->cookie == -1) { /* this means assign a new connection */
944 CDEBUG(D_CACHE, "want a new connection\n");
948 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
949 export = class_handle2object(conn->cookie, &export_handle_ops);
952 EXPORT_SYMBOL(class_conn2export);
954 struct obd_device *class_exp2obd(struct obd_export *exp)
960 EXPORT_SYMBOL(class_exp2obd);
962 struct obd_import *class_exp2cliimp(struct obd_export *exp)
964 struct obd_device *obd = exp->exp_obd;
967 return obd->u.cli.cl_import;
969 EXPORT_SYMBOL(class_exp2cliimp);
971 /* Export management functions */
972 static void class_export_destroy(struct obd_export *exp)
974 struct obd_device *obd = exp->exp_obd;
977 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
978 LASSERT(obd != NULL);
980 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
981 exp->exp_client_uuid.uuid, obd->obd_name);
983 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
984 if (exp->exp_connection)
985 ptlrpc_put_connection_superhack(exp->exp_connection);
987 LASSERT(list_empty(&exp->exp_outstanding_replies));
988 LASSERT(list_empty(&exp->exp_uncommitted_replies));
989 LASSERT(list_empty(&exp->exp_req_replay_queue));
990 LASSERT(list_empty(&exp->exp_hp_rpcs));
991 obd_destroy_export(exp);
992 /* self export doesn't hold a reference to an obd, although it
993 * exists until freeing of the obd */
994 if (exp != obd->obd_self_export)
995 class_decref(obd, "export", exp);
997 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1001 static void export_handle_addref(void *export)
1003 class_export_get(export);
1006 static struct portals_handle_ops export_handle_ops = {
1007 .hop_addref = export_handle_addref,
1011 struct obd_export *class_export_get(struct obd_export *exp)
1013 atomic_inc(&exp->exp_refcount);
1014 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1015 atomic_read(&exp->exp_refcount));
1018 EXPORT_SYMBOL(class_export_get);
1020 void class_export_put(struct obd_export *exp)
1022 LASSERT(exp != NULL);
1023 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1024 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1025 atomic_read(&exp->exp_refcount) - 1);
1027 if (atomic_dec_and_test(&exp->exp_refcount)) {
1028 struct obd_device *obd = exp->exp_obd;
1030 CDEBUG(D_IOCTL, "final put %p/%s\n",
1031 exp, exp->exp_client_uuid.uuid);
1033 /* release nid stat refererence */
1034 lprocfs_exp_cleanup(exp);
1036 if (exp == obd->obd_self_export) {
1037 /* self export should be destroyed without
1038 * zombie thread as it doesn't hold a
1039 * reference to obd and doesn't hold any
1041 class_export_destroy(exp);
1042 /* self export is destroyed, no class
1043 * references exist and it is safe to free
1045 class_free_dev(obd);
1047 LASSERT(!list_empty(&exp->exp_obd_chain));
1048 obd_zombie_export_add(exp);
1053 EXPORT_SYMBOL(class_export_put);
1055 static void obd_zombie_exp_cull(struct work_struct *ws)
1057 struct obd_export *export;
1059 export = container_of(ws, struct obd_export, exp_zombie_work);
1060 class_export_destroy(export);
1063 /* Creates a new export, adds it to the hash table, and returns a
1064 * pointer to it. The refcount is 2: one for the hash reference, and
1065 * one for the pointer returned by this function. */
1066 struct obd_export *__class_new_export(struct obd_device *obd,
1067 struct obd_uuid *cluuid, bool is_self)
1069 struct obd_export *export;
1070 struct cfs_hash *hash = NULL;
1074 OBD_ALLOC_PTR(export);
1076 return ERR_PTR(-ENOMEM);
1078 export->exp_conn_cnt = 0;
1079 export->exp_lock_hash = NULL;
1080 export->exp_flock_hash = NULL;
1081 /* 2 = class_handle_hash + last */
1082 atomic_set(&export->exp_refcount, 2);
1083 atomic_set(&export->exp_rpc_count, 0);
1084 atomic_set(&export->exp_cb_count, 0);
1085 atomic_set(&export->exp_locks_count, 0);
1086 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1087 INIT_LIST_HEAD(&export->exp_locks_list);
1088 spin_lock_init(&export->exp_locks_list_guard);
1090 atomic_set(&export->exp_replay_count, 0);
1091 export->exp_obd = obd;
1092 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1093 spin_lock_init(&export->exp_uncommitted_replies_lock);
1094 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1095 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1096 INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1097 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1098 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1099 class_handle_hash(&export->exp_handle, &export_handle_ops);
1100 export->exp_last_request_time = ktime_get_real_seconds();
1101 spin_lock_init(&export->exp_lock);
1102 spin_lock_init(&export->exp_rpc_lock);
1103 INIT_HLIST_NODE(&export->exp_uuid_hash);
1104 INIT_HLIST_NODE(&export->exp_nid_hash);
1105 INIT_HLIST_NODE(&export->exp_gen_hash);
1106 spin_lock_init(&export->exp_bl_list_lock);
1107 INIT_LIST_HEAD(&export->exp_bl_list);
1108 INIT_LIST_HEAD(&export->exp_stale_list);
1109 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1111 export->exp_sp_peer = LUSTRE_SP_ANY;
1112 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1113 export->exp_client_uuid = *cluuid;
1114 obd_init_export(export);
1116 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1117 spin_lock(&obd->obd_dev_lock);
1118 /* shouldn't happen, but might race */
1119 if (obd->obd_stopping)
1120 GOTO(exit_unlock, rc = -ENODEV);
1122 hash = cfs_hash_getref(obd->obd_uuid_hash);
1124 GOTO(exit_unlock, rc = -ENODEV);
1125 spin_unlock(&obd->obd_dev_lock);
1127 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1129 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1130 obd->obd_name, cluuid->uuid, rc);
1131 GOTO(exit_err, rc = -EALREADY);
1135 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1136 spin_lock(&obd->obd_dev_lock);
1137 if (obd->obd_stopping) {
1139 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1140 GOTO(exit_unlock, rc = -ESHUTDOWN);
1144 class_incref(obd, "export", export);
1145 list_add_tail(&export->exp_obd_chain_timed,
1146 &obd->obd_exports_timed);
1147 list_add(&export->exp_obd_chain, &obd->obd_exports);
1148 obd->obd_num_exports++;
1150 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1151 INIT_LIST_HEAD(&export->exp_obd_chain);
1153 spin_unlock(&obd->obd_dev_lock);
1155 cfs_hash_putref(hash);
1159 spin_unlock(&obd->obd_dev_lock);
1162 cfs_hash_putref(hash);
1163 class_handle_unhash(&export->exp_handle);
1164 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1165 obd_destroy_export(export);
1166 OBD_FREE_PTR(export);
1170 struct obd_export *class_new_export(struct obd_device *obd,
1171 struct obd_uuid *uuid)
1173 return __class_new_export(obd, uuid, false);
1175 EXPORT_SYMBOL(class_new_export);
1177 struct obd_export *class_new_export_self(struct obd_device *obd,
1178 struct obd_uuid *uuid)
1180 return __class_new_export(obd, uuid, true);
1183 void class_unlink_export(struct obd_export *exp)
1185 class_handle_unhash(&exp->exp_handle);
1187 if (exp->exp_obd->obd_self_export == exp) {
1188 class_export_put(exp);
1192 spin_lock(&exp->exp_obd->obd_dev_lock);
1193 /* delete an uuid-export hashitem from hashtables */
1194 if (!hlist_unhashed(&exp->exp_uuid_hash))
1195 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1196 &exp->exp_client_uuid,
1197 &exp->exp_uuid_hash);
1199 #ifdef HAVE_SERVER_SUPPORT
1200 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1201 struct tg_export_data *ted = &exp->exp_target_data;
1202 struct cfs_hash *hash;
1204 /* Because obd_gen_hash will not be released until
1205 * class_cleanup(), so hash should never be NULL here */
1206 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1207 LASSERT(hash != NULL);
1208 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1209 &exp->exp_gen_hash);
1210 cfs_hash_putref(hash);
1212 #endif /* HAVE_SERVER_SUPPORT */
1214 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1215 list_del_init(&exp->exp_obd_chain_timed);
1216 exp->exp_obd->obd_num_exports--;
1217 spin_unlock(&exp->exp_obd->obd_dev_lock);
1218 atomic_inc(&obd_stale_export_num);
1220 /* A reference is kept by obd_stale_exports list */
1221 obd_stale_export_put(exp);
1223 EXPORT_SYMBOL(class_unlink_export);
1225 /* Import management functions */
1226 static void obd_zombie_import_free(struct obd_import *imp)
1230 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1231 imp->imp_obd->obd_name);
1233 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1235 ptlrpc_put_connection_superhack(imp->imp_connection);
1237 while (!list_empty(&imp->imp_conn_list)) {
1238 struct obd_import_conn *imp_conn;
1240 imp_conn = list_entry(imp->imp_conn_list.next,
1241 struct obd_import_conn, oic_item);
1242 list_del_init(&imp_conn->oic_item);
1243 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1244 OBD_FREE(imp_conn, sizeof(*imp_conn));
1247 LASSERT(imp->imp_sec == NULL);
1248 class_decref(imp->imp_obd, "import", imp);
1253 struct obd_import *class_import_get(struct obd_import *import)
1255 atomic_inc(&import->imp_refcount);
1256 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1257 atomic_read(&import->imp_refcount),
1258 import->imp_obd->obd_name);
1261 EXPORT_SYMBOL(class_import_get);
1263 void class_import_put(struct obd_import *imp)
1267 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1269 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1270 atomic_read(&imp->imp_refcount) - 1,
1271 imp->imp_obd->obd_name);
1273 if (atomic_dec_and_test(&imp->imp_refcount)) {
1274 CDEBUG(D_INFO, "final put import %p\n", imp);
1275 obd_zombie_import_add(imp);
1278 /* catch possible import put race */
1279 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1282 EXPORT_SYMBOL(class_import_put);
1284 static void init_imp_at(struct imp_at *at) {
1286 at_init(&at->iat_net_latency, 0, 0);
1287 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1288 /* max service estimates are tracked on the server side, so
1289 don't use the AT history here, just use the last reported
1290 val. (But keep hist for proc histogram, worst_ever) */
1291 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1296 static void obd_zombie_imp_cull(struct work_struct *ws)
1298 struct obd_import *import;
1300 import = container_of(ws, struct obd_import, imp_zombie_work);
1301 obd_zombie_import_free(import);
1304 struct obd_import *class_new_import(struct obd_device *obd)
1306 struct obd_import *imp;
1307 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1309 OBD_ALLOC(imp, sizeof(*imp));
1313 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1314 INIT_LIST_HEAD(&imp->imp_replay_list);
1315 INIT_LIST_HEAD(&imp->imp_sending_list);
1316 INIT_LIST_HEAD(&imp->imp_delayed_list);
1317 INIT_LIST_HEAD(&imp->imp_committed_list);
1318 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1319 imp->imp_known_replied_xid = 0;
1320 imp->imp_replay_cursor = &imp->imp_committed_list;
1321 spin_lock_init(&imp->imp_lock);
1322 imp->imp_last_success_conn = 0;
1323 imp->imp_state = LUSTRE_IMP_NEW;
1324 imp->imp_obd = class_incref(obd, "import", imp);
1325 rwlock_init(&imp->imp_sec_lock);
1326 init_waitqueue_head(&imp->imp_recovery_waitq);
1327 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1329 if (curr_pid_ns->child_reaper)
1330 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1332 imp->imp_sec_refpid = 1;
1334 atomic_set(&imp->imp_refcount, 2);
1335 atomic_set(&imp->imp_unregistering, 0);
1336 atomic_set(&imp->imp_inflight, 0);
1337 atomic_set(&imp->imp_replay_inflight, 0);
1338 atomic_set(&imp->imp_inval_count, 0);
1339 INIT_LIST_HEAD(&imp->imp_conn_list);
1340 init_imp_at(&imp->imp_at);
1342 /* the default magic is V2, will be used in connect RPC, and
1343 * then adjusted according to the flags in request/reply. */
1344 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1348 EXPORT_SYMBOL(class_new_import);
1350 void class_destroy_import(struct obd_import *import)
1352 LASSERT(import != NULL);
1353 LASSERT(import != LP_POISON);
1355 spin_lock(&import->imp_lock);
1356 import->imp_generation++;
1357 spin_unlock(&import->imp_lock);
1358 class_import_put(import);
1360 EXPORT_SYMBOL(class_destroy_import);
1362 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1364 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1366 spin_lock(&exp->exp_locks_list_guard);
1368 LASSERT(lock->l_exp_refs_nr >= 0);
1370 if (lock->l_exp_refs_target != NULL &&
1371 lock->l_exp_refs_target != exp) {
1372 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1373 exp, lock, lock->l_exp_refs_target);
1375 if ((lock->l_exp_refs_nr ++) == 0) {
1376 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1377 lock->l_exp_refs_target = exp;
1379 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1380 lock, exp, lock->l_exp_refs_nr);
1381 spin_unlock(&exp->exp_locks_list_guard);
1383 EXPORT_SYMBOL(__class_export_add_lock_ref);
1385 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1387 spin_lock(&exp->exp_locks_list_guard);
1388 LASSERT(lock->l_exp_refs_nr > 0);
1389 if (lock->l_exp_refs_target != exp) {
1390 LCONSOLE_WARN("lock %p, "
1391 "mismatching export pointers: %p, %p\n",
1392 lock, lock->l_exp_refs_target, exp);
1394 if (-- lock->l_exp_refs_nr == 0) {
1395 list_del_init(&lock->l_exp_refs_link);
1396 lock->l_exp_refs_target = NULL;
1398 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1399 lock, exp, lock->l_exp_refs_nr);
1400 spin_unlock(&exp->exp_locks_list_guard);
1402 EXPORT_SYMBOL(__class_export_del_lock_ref);
1405 /* A connection defines an export context in which preallocation can
1406 be managed. This releases the export pointer reference, and returns
1407 the export handle, so the export refcount is 1 when this function
1409 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1410 struct obd_uuid *cluuid)
1412 struct obd_export *export;
1413 LASSERT(conn != NULL);
1414 LASSERT(obd != NULL);
1415 LASSERT(cluuid != NULL);
1418 export = class_new_export(obd, cluuid);
1420 RETURN(PTR_ERR(export));
1422 conn->cookie = export->exp_handle.h_cookie;
1423 class_export_put(export);
1425 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1426 cluuid->uuid, conn->cookie);
1429 EXPORT_SYMBOL(class_connect);
1431 /* if export is involved in recovery then clean up related things */
1432 static void class_export_recovery_cleanup(struct obd_export *exp)
1434 struct obd_device *obd = exp->exp_obd;
1436 spin_lock(&obd->obd_recovery_task_lock);
1437 if (obd->obd_recovering) {
1438 if (exp->exp_in_recovery) {
1439 spin_lock(&exp->exp_lock);
1440 exp->exp_in_recovery = 0;
1441 spin_unlock(&exp->exp_lock);
1442 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1443 atomic_dec(&obd->obd_connected_clients);
1446 /* if called during recovery then should update
1447 * obd_stale_clients counter,
1448 * lightweight exports are not counted */
1449 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1450 exp->exp_obd->obd_stale_clients++;
1452 spin_unlock(&obd->obd_recovery_task_lock);
1454 spin_lock(&exp->exp_lock);
1455 /** Cleanup req replay fields */
1456 if (exp->exp_req_replay_needed) {
1457 exp->exp_req_replay_needed = 0;
1459 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1460 atomic_dec(&obd->obd_req_replay_clients);
1463 /** Cleanup lock replay data */
1464 if (exp->exp_lock_replay_needed) {
1465 exp->exp_lock_replay_needed = 0;
1467 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1468 atomic_dec(&obd->obd_lock_replay_clients);
1470 spin_unlock(&exp->exp_lock);
1473 /* This function removes 1-3 references from the export:
1474 * 1 - for export pointer passed
1475 * and if disconnect really need
1476 * 2 - removing from hash
1477 * 3 - in client_unlink_export
1478 * The export pointer passed to this function can destroyed */
1479 int class_disconnect(struct obd_export *export)
1481 int already_disconnected;
1484 if (export == NULL) {
1485 CWARN("attempting to free NULL export %p\n", export);
1489 spin_lock(&export->exp_lock);
1490 already_disconnected = export->exp_disconnected;
1491 export->exp_disconnected = 1;
1492 /* We hold references of export for uuid hash
1493 * and nid_hash and export link at least. So
1494 * it is safe to call cfs_hash_del in there. */
1495 if (!hlist_unhashed(&export->exp_nid_hash))
1496 cfs_hash_del(export->exp_obd->obd_nid_hash,
1497 &export->exp_connection->c_peer.nid,
1498 &export->exp_nid_hash);
1499 spin_unlock(&export->exp_lock);
1501 /* class_cleanup(), abort_recovery(), and class_fail_export()
1502 * all end up in here, and if any of them race we shouldn't
1503 * call extra class_export_puts(). */
1504 if (already_disconnected) {
1505 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1506 GOTO(no_disconn, already_disconnected);
1509 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1510 export->exp_handle.h_cookie);
1512 class_export_recovery_cleanup(export);
1513 class_unlink_export(export);
1515 class_export_put(export);
1518 EXPORT_SYMBOL(class_disconnect);
1520 /* Return non-zero for a fully connected export */
1521 int class_connected_export(struct obd_export *exp)
1526 spin_lock(&exp->exp_lock);
1527 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1528 spin_unlock(&exp->exp_lock);
1532 EXPORT_SYMBOL(class_connected_export);
1534 static void class_disconnect_export_list(struct list_head *list,
1535 enum obd_option flags)
1538 struct obd_export *exp;
1541 /* It's possible that an export may disconnect itself, but
1542 * nothing else will be added to this list. */
1543 while (!list_empty(list)) {
1544 exp = list_entry(list->next, struct obd_export,
1546 /* need for safe call CDEBUG after obd_disconnect */
1547 class_export_get(exp);
1549 spin_lock(&exp->exp_lock);
1550 exp->exp_flags = flags;
1551 spin_unlock(&exp->exp_lock);
1553 if (obd_uuid_equals(&exp->exp_client_uuid,
1554 &exp->exp_obd->obd_uuid)) {
1556 "exp %p export uuid == obd uuid, don't discon\n",
1558 /* Need to delete this now so we don't end up pointing
1559 * to work_list later when this export is cleaned up. */
1560 list_del_init(&exp->exp_obd_chain);
1561 class_export_put(exp);
1565 class_export_get(exp);
1566 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1567 "last request at %lld\n",
1568 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1569 exp, exp->exp_last_request_time);
1570 /* release one export reference anyway */
1571 rc = obd_disconnect(exp);
1573 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1574 obd_export_nid2str(exp), exp, rc);
1575 class_export_put(exp);
1580 void class_disconnect_exports(struct obd_device *obd)
1582 struct list_head work_list;
1585 /* Move all of the exports from obd_exports to a work list, en masse. */
1586 INIT_LIST_HEAD(&work_list);
1587 spin_lock(&obd->obd_dev_lock);
1588 list_splice_init(&obd->obd_exports, &work_list);
1589 list_splice_init(&obd->obd_delayed_exports, &work_list);
1590 spin_unlock(&obd->obd_dev_lock);
1592 if (!list_empty(&work_list)) {
1593 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1594 "disconnecting them\n", obd->obd_minor, obd);
1595 class_disconnect_export_list(&work_list,
1596 exp_flags_from_obd(obd));
1598 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1599 obd->obd_minor, obd);
1602 EXPORT_SYMBOL(class_disconnect_exports);
1604 /* Remove exports that have not completed recovery.
1606 void class_disconnect_stale_exports(struct obd_device *obd,
1607 int (*test_export)(struct obd_export *))
1609 struct list_head work_list;
1610 struct obd_export *exp, *n;
1614 INIT_LIST_HEAD(&work_list);
1615 spin_lock(&obd->obd_dev_lock);
1616 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1618 /* don't count self-export as client */
1619 if (obd_uuid_equals(&exp->exp_client_uuid,
1620 &exp->exp_obd->obd_uuid))
1623 /* don't evict clients which have no slot in last_rcvd
1624 * (e.g. lightweight connection) */
1625 if (exp->exp_target_data.ted_lr_idx == -1)
1628 spin_lock(&exp->exp_lock);
1629 if (exp->exp_failed || test_export(exp)) {
1630 spin_unlock(&exp->exp_lock);
1633 exp->exp_failed = 1;
1634 spin_unlock(&exp->exp_lock);
1636 list_move(&exp->exp_obd_chain, &work_list);
1638 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1639 obd->obd_name, exp->exp_client_uuid.uuid,
1640 obd_export_nid2str(exp));
1641 print_export_data(exp, "EVICTING", 0, D_HA);
1643 spin_unlock(&obd->obd_dev_lock);
1646 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1647 obd->obd_name, evicted);
1649 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1650 OBD_OPT_ABORT_RECOV);
1653 EXPORT_SYMBOL(class_disconnect_stale_exports);
1655 void class_fail_export(struct obd_export *exp)
1657 int rc, already_failed;
1659 spin_lock(&exp->exp_lock);
1660 already_failed = exp->exp_failed;
1661 exp->exp_failed = 1;
1662 spin_unlock(&exp->exp_lock);
1664 if (already_failed) {
1665 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1666 exp, exp->exp_client_uuid.uuid);
1670 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1671 exp, exp->exp_client_uuid.uuid);
1673 if (obd_dump_on_timeout)
1674 libcfs_debug_dumplog();
1676 /* need for safe call CDEBUG after obd_disconnect */
1677 class_export_get(exp);
1679 /* Most callers into obd_disconnect are removing their own reference
1680 * (request, for example) in addition to the one from the hash table.
1681 * We don't have such a reference here, so make one. */
1682 class_export_get(exp);
1683 rc = obd_disconnect(exp);
1685 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1687 CDEBUG(D_HA, "disconnected export %p/%s\n",
1688 exp, exp->exp_client_uuid.uuid);
1689 class_export_put(exp);
1691 EXPORT_SYMBOL(class_fail_export);
1693 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1695 struct cfs_hash *nid_hash;
1696 struct obd_export *doomed_exp = NULL;
1697 int exports_evicted = 0;
1699 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1701 spin_lock(&obd->obd_dev_lock);
1702 /* umount has run already, so evict thread should leave
1703 * its task to umount thread now */
1704 if (obd->obd_stopping) {
1705 spin_unlock(&obd->obd_dev_lock);
1706 return exports_evicted;
1708 nid_hash = obd->obd_nid_hash;
1709 cfs_hash_getref(nid_hash);
1710 spin_unlock(&obd->obd_dev_lock);
1713 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1714 if (doomed_exp == NULL)
1717 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1718 "nid %s found, wanted nid %s, requested nid %s\n",
1719 obd_export_nid2str(doomed_exp),
1720 libcfs_nid2str(nid_key), nid);
1721 LASSERTF(doomed_exp != obd->obd_self_export,
1722 "self-export is hashed by NID?\n");
1724 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1725 "request\n", obd->obd_name,
1726 obd_uuid2str(&doomed_exp->exp_client_uuid),
1727 obd_export_nid2str(doomed_exp));
1728 class_fail_export(doomed_exp);
1729 class_export_put(doomed_exp);
1732 cfs_hash_putref(nid_hash);
1734 if (!exports_evicted)
1735 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1736 obd->obd_name, nid);
1737 return exports_evicted;
1739 EXPORT_SYMBOL(obd_export_evict_by_nid);
1741 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1743 struct cfs_hash *uuid_hash;
1744 struct obd_export *doomed_exp = NULL;
1745 struct obd_uuid doomed_uuid;
1746 int exports_evicted = 0;
1748 spin_lock(&obd->obd_dev_lock);
1749 if (obd->obd_stopping) {
1750 spin_unlock(&obd->obd_dev_lock);
1751 return exports_evicted;
1753 uuid_hash = obd->obd_uuid_hash;
1754 cfs_hash_getref(uuid_hash);
1755 spin_unlock(&obd->obd_dev_lock);
1757 obd_str2uuid(&doomed_uuid, uuid);
1758 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1759 CERROR("%s: can't evict myself\n", obd->obd_name);
1760 cfs_hash_putref(uuid_hash);
1761 return exports_evicted;
1764 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1766 if (doomed_exp == NULL) {
1767 CERROR("%s: can't disconnect %s: no exports found\n",
1768 obd->obd_name, uuid);
1770 CWARN("%s: evicting %s at adminstrative request\n",
1771 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1772 class_fail_export(doomed_exp);
1773 class_export_put(doomed_exp);
1776 cfs_hash_putref(uuid_hash);
1778 return exports_evicted;
1781 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1782 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1783 EXPORT_SYMBOL(class_export_dump_hook);
1786 static void print_export_data(struct obd_export *exp, const char *status,
1787 int locks, int debug_level)
1789 struct ptlrpc_reply_state *rs;
1790 struct ptlrpc_reply_state *first_reply = NULL;
1793 spin_lock(&exp->exp_lock);
1794 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1800 spin_unlock(&exp->exp_lock);
1802 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1803 "%p %s %llu stale:%d\n",
1804 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1805 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1806 atomic_read(&exp->exp_rpc_count),
1807 atomic_read(&exp->exp_cb_count),
1808 atomic_read(&exp->exp_locks_count),
1809 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1810 nreplies, first_reply, nreplies > 3 ? "..." : "",
1811 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1812 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1813 if (locks && class_export_dump_hook != NULL)
1814 class_export_dump_hook(exp);
1818 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1820 struct obd_export *exp;
1822 spin_lock(&obd->obd_dev_lock);
1823 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1824 print_export_data(exp, "ACTIVE", locks, debug_level);
1825 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1826 print_export_data(exp, "UNLINKED", locks, debug_level);
1827 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1828 print_export_data(exp, "DELAYED", locks, debug_level);
1829 spin_unlock(&obd->obd_dev_lock);
1832 void obd_exports_barrier(struct obd_device *obd)
1835 LASSERT(list_empty(&obd->obd_exports));
1836 spin_lock(&obd->obd_dev_lock);
1837 while (!list_empty(&obd->obd_unlinked_exports)) {
1838 spin_unlock(&obd->obd_dev_lock);
1839 set_current_state(TASK_UNINTERRUPTIBLE);
1840 schedule_timeout(cfs_time_seconds(waited));
1841 if (waited > 5 && is_power_of_2(waited)) {
1842 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1843 "more than %d seconds. "
1844 "The obd refcount = %d. Is it stuck?\n",
1845 obd->obd_name, waited,
1846 atomic_read(&obd->obd_refcount));
1847 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1850 spin_lock(&obd->obd_dev_lock);
1852 spin_unlock(&obd->obd_dev_lock);
1854 EXPORT_SYMBOL(obd_exports_barrier);
1857 * Add export to the obd_zombe thread and notify it.
1859 static void obd_zombie_export_add(struct obd_export *exp) {
1860 atomic_dec(&obd_stale_export_num);
1861 spin_lock(&exp->exp_obd->obd_dev_lock);
1862 LASSERT(!list_empty(&exp->exp_obd_chain));
1863 list_del_init(&exp->exp_obd_chain);
1864 spin_unlock(&exp->exp_obd->obd_dev_lock);
1866 queue_work(zombie_wq, &exp->exp_zombie_work);
1870 * Add import to the obd_zombe thread and notify it.
1872 static void obd_zombie_import_add(struct obd_import *imp) {
1873 LASSERT(imp->imp_sec == NULL);
1875 queue_work(zombie_wq, &imp->imp_zombie_work);
1879 * wait when obd_zombie import/export queues become empty
1881 void obd_zombie_barrier(void)
1883 flush_workqueue(zombie_wq);
1885 EXPORT_SYMBOL(obd_zombie_barrier);
1888 struct obd_export *obd_stale_export_get(void)
1890 struct obd_export *exp = NULL;
1893 spin_lock(&obd_stale_export_lock);
1894 if (!list_empty(&obd_stale_exports)) {
1895 exp = list_entry(obd_stale_exports.next,
1896 struct obd_export, exp_stale_list);
1897 list_del_init(&exp->exp_stale_list);
1899 spin_unlock(&obd_stale_export_lock);
1902 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1903 atomic_read(&obd_stale_export_num));
1907 EXPORT_SYMBOL(obd_stale_export_get);
1909 void obd_stale_export_put(struct obd_export *exp)
1913 LASSERT(list_empty(&exp->exp_stale_list));
1914 if (exp->exp_lock_hash &&
1915 atomic_read(&exp->exp_lock_hash->hs_count)) {
1916 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1917 atomic_read(&obd_stale_export_num));
1919 spin_lock_bh(&exp->exp_bl_list_lock);
1920 spin_lock(&obd_stale_export_lock);
1921 /* Add to the tail if there is no blocked locks,
1922 * to the head otherwise. */
1923 if (list_empty(&exp->exp_bl_list))
1924 list_add_tail(&exp->exp_stale_list,
1925 &obd_stale_exports);
1927 list_add(&exp->exp_stale_list,
1928 &obd_stale_exports);
1930 spin_unlock(&obd_stale_export_lock);
1931 spin_unlock_bh(&exp->exp_bl_list_lock);
1933 class_export_put(exp);
1937 EXPORT_SYMBOL(obd_stale_export_put);
1940 * Adjust the position of the export in the stale list,
1941 * i.e. move to the head of the list if is needed.
1943 void obd_stale_export_adjust(struct obd_export *exp)
1945 LASSERT(exp != NULL);
1946 spin_lock_bh(&exp->exp_bl_list_lock);
1947 spin_lock(&obd_stale_export_lock);
1949 if (!list_empty(&exp->exp_stale_list) &&
1950 !list_empty(&exp->exp_bl_list))
1951 list_move(&exp->exp_stale_list, &obd_stale_exports);
1953 spin_unlock(&obd_stale_export_lock);
1954 spin_unlock_bh(&exp->exp_bl_list_lock);
1956 EXPORT_SYMBOL(obd_stale_export_adjust);
1959 * start destroy zombie import/export thread
1961 int obd_zombie_impexp_init(void)
1963 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1971 * stop destroy zombie import/export thread
1973 void obd_zombie_impexp_stop(void)
1975 destroy_workqueue(zombie_wq);
1976 LASSERT(list_empty(&obd_stale_exports));
1979 /***** Kernel-userspace comm helpers *******/
1981 /* Get length of entire message, including header */
1982 int kuc_len(int payload_len)
1984 return sizeof(struct kuc_hdr) + payload_len;
1986 EXPORT_SYMBOL(kuc_len);
1988 /* Get a pointer to kuc header, given a ptr to the payload
1989 * @param p Pointer to payload area
1990 * @returns Pointer to kuc header
1992 struct kuc_hdr * kuc_ptr(void *p)
1994 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1995 LASSERT(lh->kuc_magic == KUC_MAGIC);
1998 EXPORT_SYMBOL(kuc_ptr);
2000 /* Alloc space for a message, and fill in header
2001 * @return Pointer to payload area
2003 void *kuc_alloc(int payload_len, int transport, int type)
2006 int len = kuc_len(payload_len);
2010 return ERR_PTR(-ENOMEM);
2012 lh->kuc_magic = KUC_MAGIC;
2013 lh->kuc_transport = transport;
2014 lh->kuc_msgtype = type;
2015 lh->kuc_msglen = len;
2017 return (void *)(lh + 1);
2019 EXPORT_SYMBOL(kuc_alloc);
2021 /* Takes pointer to payload area */
2022 void kuc_free(void *p, int payload_len)
2024 struct kuc_hdr *lh = kuc_ptr(p);
2025 OBD_FREE(lh, kuc_len(payload_len));
2027 EXPORT_SYMBOL(kuc_free);
2029 struct obd_request_slot_waiter {
2030 struct list_head orsw_entry;
2031 wait_queue_head_t orsw_waitq;
2035 static bool obd_request_slot_avail(struct client_obd *cli,
2036 struct obd_request_slot_waiter *orsw)
2040 spin_lock(&cli->cl_loi_list_lock);
2041 avail = !!list_empty(&orsw->orsw_entry);
2042 spin_unlock(&cli->cl_loi_list_lock);
2048 * For network flow control, the RPC sponsor needs to acquire a credit
2049 * before sending the RPC. The credits count for a connection is defined
2050 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2051 * the subsequent RPC sponsors need to wait until others released their
2052 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2054 int obd_get_request_slot(struct client_obd *cli)
2056 struct obd_request_slot_waiter orsw;
2057 struct l_wait_info lwi;
2060 spin_lock(&cli->cl_loi_list_lock);
2061 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2062 cli->cl_rpcs_in_flight++;
2063 spin_unlock(&cli->cl_loi_list_lock);
2067 init_waitqueue_head(&orsw.orsw_waitq);
2068 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2069 orsw.orsw_signaled = false;
2070 spin_unlock(&cli->cl_loi_list_lock);
2072 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2073 rc = l_wait_event(orsw.orsw_waitq,
2074 obd_request_slot_avail(cli, &orsw) ||
2078 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2079 * freed but other (such as obd_put_request_slot) is using it. */
2080 spin_lock(&cli->cl_loi_list_lock);
2082 if (!orsw.orsw_signaled) {
2083 if (list_empty(&orsw.orsw_entry))
2084 cli->cl_rpcs_in_flight--;
2086 list_del(&orsw.orsw_entry);
2090 if (orsw.orsw_signaled) {
2091 LASSERT(list_empty(&orsw.orsw_entry));
2095 spin_unlock(&cli->cl_loi_list_lock);
2099 EXPORT_SYMBOL(obd_get_request_slot);
2101 void obd_put_request_slot(struct client_obd *cli)
2103 struct obd_request_slot_waiter *orsw;
2105 spin_lock(&cli->cl_loi_list_lock);
2106 cli->cl_rpcs_in_flight--;
2108 /* If there is free slot, wakeup the first waiter. */
2109 if (!list_empty(&cli->cl_flight_waiters) &&
2110 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2111 orsw = list_entry(cli->cl_flight_waiters.next,
2112 struct obd_request_slot_waiter, orsw_entry);
2113 list_del_init(&orsw->orsw_entry);
2114 cli->cl_rpcs_in_flight++;
2115 wake_up(&orsw->orsw_waitq);
2117 spin_unlock(&cli->cl_loi_list_lock);
2119 EXPORT_SYMBOL(obd_put_request_slot);
2121 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2123 return cli->cl_max_rpcs_in_flight;
2125 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2127 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2129 struct obd_request_slot_waiter *orsw;
2133 const char *type_name;
2136 if (max > OBD_MAX_RIF_MAX || max < 1)
2139 type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2140 if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2141 /* adjust max_mod_rpcs_in_flight to ensure it is always
2142 * strictly lower that max_rpcs_in_flight */
2144 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2145 "because it must be higher than "
2146 "max_mod_rpcs_in_flight value",
2147 cli->cl_import->imp_obd->obd_name);
2150 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2151 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2157 spin_lock(&cli->cl_loi_list_lock);
2158 old = cli->cl_max_rpcs_in_flight;
2159 cli->cl_max_rpcs_in_flight = max;
2160 client_adjust_max_dirty(cli);
2164 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2165 for (i = 0; i < diff; i++) {
2166 if (list_empty(&cli->cl_flight_waiters))
2169 orsw = list_entry(cli->cl_flight_waiters.next,
2170 struct obd_request_slot_waiter, orsw_entry);
2171 list_del_init(&orsw->orsw_entry);
2172 cli->cl_rpcs_in_flight++;
2173 wake_up(&orsw->orsw_waitq);
2175 spin_unlock(&cli->cl_loi_list_lock);
2179 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2181 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2183 return cli->cl_max_mod_rpcs_in_flight;
2185 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2187 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2189 struct obd_connect_data *ocd;
2193 if (max > OBD_MAX_RIF_MAX || max < 1)
2196 /* cannot exceed or equal max_rpcs_in_flight */
2197 if (max >= cli->cl_max_rpcs_in_flight) {
2198 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2199 "higher or equal to max_rpcs_in_flight value (%u)\n",
2200 cli->cl_import->imp_obd->obd_name,
2201 max, cli->cl_max_rpcs_in_flight);
2205 /* cannot exceed max modify RPCs in flight supported by the server */
2206 ocd = &cli->cl_import->imp_connect_data;
2207 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2208 maxmodrpcs = ocd->ocd_maxmodrpcs;
2211 if (max > maxmodrpcs) {
2212 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2213 "higher than max_mod_rpcs_per_client value (%hu) "
2214 "returned by the server at connection\n",
2215 cli->cl_import->imp_obd->obd_name,
2220 spin_lock(&cli->cl_mod_rpcs_lock);
2222 prev = cli->cl_max_mod_rpcs_in_flight;
2223 cli->cl_max_mod_rpcs_in_flight = max;
2225 /* wakeup waiters if limit has been increased */
2226 if (cli->cl_max_mod_rpcs_in_flight > prev)
2227 wake_up(&cli->cl_mod_rpcs_waitq);
2229 spin_unlock(&cli->cl_mod_rpcs_lock);
2233 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2235 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2236 struct seq_file *seq)
2238 unsigned long mod_tot = 0, mod_cum;
2239 struct timespec64 now;
2242 ktime_get_real_ts64(&now);
2244 spin_lock(&cli->cl_mod_rpcs_lock);
2246 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2247 (s64)now.tv_sec, now.tv_nsec);
2248 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2249 cli->cl_mod_rpcs_in_flight);
2251 seq_printf(seq, "\n\t\t\tmodify\n");
2252 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2254 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2257 for (i = 0; i < OBD_HIST_MAX; i++) {
2258 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2260 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2261 i, mod, pct(mod, mod_tot),
2262 pct(mod_cum, mod_tot));
2263 if (mod_cum == mod_tot)
2267 spin_unlock(&cli->cl_mod_rpcs_lock);
2271 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2273 /* The number of modify RPCs sent in parallel is limited
2274 * because the server has a finite number of slots per client to
2275 * store request result and ensure reply reconstruction when needed.
2276 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2277 * that takes into account server limit and cl_max_rpcs_in_flight
2279 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2280 * one close request is allowed above the maximum.
2282 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2287 /* A slot is available if
2288 * - number of modify RPCs in flight is less than the max
2289 * - it's a close RPC and no other close request is in flight
2291 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2292 (close_req && cli->cl_close_rpcs_in_flight == 0);
2297 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2302 spin_lock(&cli->cl_mod_rpcs_lock);
2303 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2304 spin_unlock(&cli->cl_mod_rpcs_lock);
2308 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2311 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2312 it->it_op == IT_READDIR ||
2313 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2318 /* Get a modify RPC slot from the obd client @cli according
2319 * to the kind of operation @opc that is going to be sent
2320 * and the intent @it of the operation if it applies.
2321 * If the maximum number of modify RPCs in flight is reached
2322 * the thread is put to sleep.
2323 * Returns the tag to be set in the request message. Tag 0
2324 * is reserved for non-modifying requests.
2326 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2327 struct lookup_intent *it)
2329 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2330 bool close_req = false;
2333 /* read-only metadata RPCs don't consume a slot on MDT
2334 * for reply reconstruction
2336 if (obd_skip_mod_rpc_slot(it))
2339 if (opc == MDS_CLOSE)
2343 spin_lock(&cli->cl_mod_rpcs_lock);
2344 max = cli->cl_max_mod_rpcs_in_flight;
2345 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2346 /* there is a slot available */
2347 cli->cl_mod_rpcs_in_flight++;
2349 cli->cl_close_rpcs_in_flight++;
2350 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2351 cli->cl_mod_rpcs_in_flight);
2352 /* find a free tag */
2353 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2355 LASSERT(i < OBD_MAX_RIF_MAX);
2356 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2357 spin_unlock(&cli->cl_mod_rpcs_lock);
2358 /* tag 0 is reserved for non-modify RPCs */
2360 CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
2361 "opc %u, max %hu\n",
2362 cli->cl_import->imp_obd->obd_name,
2367 spin_unlock(&cli->cl_mod_rpcs_lock);
2369 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2370 "opc %u, max %hu\n",
2371 cli->cl_import->imp_obd->obd_name, opc, max);
2373 l_wait_event_exclusive(cli->cl_mod_rpcs_waitq,
2374 obd_mod_rpc_slot_avail(cli, close_req),
2378 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2380 /* Put a modify RPC slot from the obd client @cli according
2381 * to the kind of operation @opc that has been sent and the
2382 * intent @it of the operation if it applies.
2384 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2385 struct lookup_intent *it, __u16 tag)
2387 bool close_req = false;
2389 if (obd_skip_mod_rpc_slot(it))
2392 if (opc == MDS_CLOSE)
2395 spin_lock(&cli->cl_mod_rpcs_lock);
2396 cli->cl_mod_rpcs_in_flight--;
2398 cli->cl_close_rpcs_in_flight--;
2399 /* release the tag in the bitmap */
2400 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2401 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2402 spin_unlock(&cli->cl_mod_rpcs_lock);
2403 wake_up(&cli->cl_mod_rpcs_waitq);
2405 EXPORT_SYMBOL(obd_put_mod_rpc_slot);