4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
54 static struct kmem_cache *obd_device_cachep;
56 static struct workqueue_struct *zombie_wq;
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61 const char *status, int locks, int debug_level);
63 static LIST_HEAD(obd_stale_exports);
64 static DEFINE_SPINLOCK(obd_stale_export_lock);
65 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
101 struct list_head *tmp;
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 list_for_each(tmp, &obd_types) {
106 type = list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
152 void class_put_type(struct obd_type *type)
155 spin_lock(&type->obd_type_lock);
157 module_put(type->typ_dt_ops->o_owner);
158 spin_unlock(&type->obd_type_lock);
161 static void class_sysfs_release(struct kobject *kobj)
163 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
165 #ifdef HAVE_SERVER_SUPPORT
166 if (type->typ_sym_filter)
167 type->typ_debugfs_entry = NULL;
169 debugfs_remove_recursive(type->typ_debugfs_entry);
170 type->typ_debugfs_entry = NULL;
173 lu_device_type_fini(type->typ_lu);
175 spin_lock(&obd_types_lock);
176 list_del(&type->typ_chain);
177 spin_unlock(&obd_types_lock);
179 if (type->typ_name) {
180 #ifdef CONFIG_PROC_FS
181 if (type->typ_procroot)
182 remove_proc_subtree(type->typ_name, proc_lustre_root);
184 OBD_FREE(type->typ_name, strlen(type->typ_name) + 1);
186 if (type->typ_md_ops)
187 OBD_FREE_PTR(type->typ_md_ops);
188 if (type->typ_dt_ops)
189 OBD_FREE_PTR(type->typ_dt_ops);
191 OBD_FREE(type, sizeof(*type));
194 static struct kobj_type class_ktype = {
195 .sysfs_ops = &lustre_sysfs_ops,
196 .release = class_sysfs_release,
199 #ifdef HAVE_SERVER_SUPPORT
200 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
202 struct dentry *symlink;
203 struct obd_type *type;
204 struct kobject *kobj;
207 kobj = kset_find_obj(lustre_kset, name);
210 return ERR_PTR(-EEXIST);
213 OBD_ALLOC(type, sizeof(*type));
215 return ERR_PTR(-ENOMEM);
217 INIT_LIST_HEAD(&type->typ_chain);
219 type->typ_kobj.kset = lustre_kset;
220 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
221 &lustre_kset->kobj, "%s", name);
225 symlink = debugfs_create_dir(name, debugfs_lustre_root);
226 if (IS_ERR_OR_NULL(symlink)) {
227 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
228 kobject_put(&type->typ_kobj);
231 type->typ_debugfs_entry = symlink;
232 type->typ_sym_filter = true;
235 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
237 if (IS_ERR(type->typ_procroot)) {
238 CERROR("%s: can't create compat proc entry: %d\n",
239 name, (int)PTR_ERR(type->typ_procroot));
240 type->typ_procroot = NULL;
246 EXPORT_SYMBOL(class_add_symlinks);
247 #endif /* HAVE_SERVER_SUPPORT */
249 #define CLASS_MAX_NAME 1024
251 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
252 bool enable_proc, struct lprocfs_vars *vars,
253 const char *name, struct lu_device_type *ldt)
255 struct obd_type *type;
260 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
262 if (class_search_type(name)) {
263 #ifdef HAVE_SERVER_SUPPORT
264 if (strcmp(name, LUSTRE_LOV_NAME) == 0 ||
265 strcmp(name, LUSTRE_OSC_NAME) == 0) {
266 struct kobject *kobj;
268 kobj = kset_find_obj(lustre_kset, name);
270 type = container_of(kobj, struct obd_type,
275 #endif /* HAVE_SERVER_SUPPORT */
276 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
280 OBD_ALLOC(type, sizeof(*type));
284 INIT_LIST_HEAD(&type->typ_chain);
285 type->typ_kobj.kset = lustre_kset;
286 kobject_init(&type->typ_kobj, &class_ktype);
287 #ifdef HAVE_SERVER_SUPPORT
289 #endif /* HAVE_SERVER_SUPPORT */
290 OBD_ALLOC_PTR(type->typ_dt_ops);
291 OBD_ALLOC_PTR(type->typ_md_ops);
292 OBD_ALLOC(type->typ_name, strlen(name) + 1);
294 if (type->typ_dt_ops == NULL ||
295 type->typ_md_ops == NULL ||
296 type->typ_name == NULL)
297 GOTO (failed, rc = -ENOMEM);
299 *(type->typ_dt_ops) = *dt_ops;
300 /* md_ops is optional */
302 *(type->typ_md_ops) = *md_ops;
303 strcpy(type->typ_name, name);
304 spin_lock_init(&type->obd_type_lock);
306 #ifdef HAVE_SERVER_SUPPORT
307 if (type->typ_sym_filter)
310 #ifdef CONFIG_PROC_FS
311 if (enable_proc && !type->typ_procroot) {
312 type->typ_procroot = lprocfs_register(type->typ_name,
315 if (IS_ERR(type->typ_procroot)) {
316 rc = PTR_ERR(type->typ_procroot);
317 type->typ_procroot = NULL;
322 type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
324 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
325 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
327 type->typ_debugfs_entry = NULL;
331 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
334 #ifdef HAVE_SERVER_SUPPORT
339 rc = lu_device_type_init(ldt);
344 spin_lock(&obd_types_lock);
345 list_add(&type->typ_chain, &obd_types);
346 spin_unlock(&obd_types_lock);
351 kobject_put(&type->typ_kobj);
355 EXPORT_SYMBOL(class_register_type);
357 int class_unregister_type(const char *name)
359 struct obd_type *type = class_search_type(name);
363 CERROR("unknown obd type\n");
367 if (type->typ_refcnt) {
368 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
369 /* This is a bad situation, let's make the best of it */
370 /* Remove ops, but leave the name for debugging */
371 OBD_FREE_PTR(type->typ_dt_ops);
372 OBD_FREE_PTR(type->typ_md_ops);
376 kobject_put(&type->typ_kobj);
379 } /* class_unregister_type */
380 EXPORT_SYMBOL(class_unregister_type);
383 * Create a new obd device.
385 * Allocate the new obd_device and initialize it.
387 * \param[in] type_name obd device type string.
388 * \param[in] name obd device name.
389 * \param[in] uuid obd device UUID
391 * \retval newdev pointer to created obd_device
392 * \retval ERR_PTR(errno) on error
394 struct obd_device *class_newdev(const char *type_name, const char *name,
397 struct obd_device *newdev;
398 struct obd_type *type = NULL;
401 if (strlen(name) >= MAX_OBD_NAME) {
402 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
403 RETURN(ERR_PTR(-EINVAL));
406 type = class_get_type(type_name);
408 CERROR("OBD: unknown type: %s\n", type_name);
409 RETURN(ERR_PTR(-ENODEV));
412 newdev = obd_device_alloc();
413 if (newdev == NULL) {
414 class_put_type(type);
415 RETURN(ERR_PTR(-ENOMEM));
417 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
418 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
419 newdev->obd_type = type;
420 newdev->obd_minor = -1;
422 rwlock_init(&newdev->obd_pool_lock);
423 newdev->obd_pool_limit = 0;
424 newdev->obd_pool_slv = 0;
426 INIT_LIST_HEAD(&newdev->obd_exports);
427 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
428 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
429 INIT_LIST_HEAD(&newdev->obd_exports_timed);
430 INIT_LIST_HEAD(&newdev->obd_nid_stats);
431 spin_lock_init(&newdev->obd_nid_lock);
432 spin_lock_init(&newdev->obd_dev_lock);
433 mutex_init(&newdev->obd_dev_mutex);
434 spin_lock_init(&newdev->obd_osfs_lock);
435 /* newdev->obd_osfs_age must be set to a value in the distant
436 * past to guarantee a fresh statfs is fetched on mount. */
437 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
439 /* XXX belongs in setup not attach */
440 init_rwsem(&newdev->obd_observer_link_sem);
442 spin_lock_init(&newdev->obd_recovery_task_lock);
443 init_waitqueue_head(&newdev->obd_next_transno_waitq);
444 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
445 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
446 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
447 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
448 INIT_LIST_HEAD(&newdev->obd_evict_list);
449 INIT_LIST_HEAD(&newdev->obd_lwp_list);
451 llog_group_init(&newdev->obd_olg);
452 /* Detach drops this */
453 atomic_set(&newdev->obd_refcount, 1);
454 lu_ref_init(&newdev->obd_reference);
455 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
457 newdev->obd_conn_inprogress = 0;
459 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
461 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
462 newdev->obd_name, newdev);
470 * \param[in] obd obd_device to be freed
474 void class_free_dev(struct obd_device *obd)
476 struct obd_type *obd_type = obd->obd_type;
478 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
479 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
480 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
481 "obd %p != obd_devs[%d] %p\n",
482 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
483 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
484 "obd_refcount should be 0, not %d\n",
485 atomic_read(&obd->obd_refcount));
486 LASSERT(obd_type != NULL);
488 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
489 obd->obd_name, obd->obd_type->typ_name);
491 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
492 obd->obd_name, obd->obd_uuid.uuid);
493 if (obd->obd_stopping) {
496 /* If we're not stopping, we were never set up */
497 err = obd_cleanup(obd);
499 CERROR("Cleanup %s returned %d\n",
503 obd_device_free(obd);
505 class_put_type(obd_type);
509 * Unregister obd device.
511 * Free slot in obd_dev[] used by \a obd.
513 * \param[in] new_obd obd_device to be unregistered
517 void class_unregister_device(struct obd_device *obd)
519 write_lock(&obd_dev_lock);
520 if (obd->obd_minor >= 0) {
521 LASSERT(obd_devs[obd->obd_minor] == obd);
522 obd_devs[obd->obd_minor] = NULL;
525 write_unlock(&obd_dev_lock);
529 * Register obd device.
531 * Find free slot in obd_devs[], fills it with \a new_obd.
533 * \param[in] new_obd obd_device to be registered
536 * \retval -EEXIST device with this name is registered
537 * \retval -EOVERFLOW obd_devs[] is full
539 int class_register_device(struct obd_device *new_obd)
543 int new_obd_minor = 0;
544 bool minor_assign = false;
545 bool retried = false;
548 write_lock(&obd_dev_lock);
549 for (i = 0; i < class_devno_max(); i++) {
550 struct obd_device *obd = class_num2obd(i);
553 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
556 write_unlock(&obd_dev_lock);
558 /* the obd_device could be waited to be
559 * destroyed by the "obd_zombie_impexp_thread".
561 obd_zombie_barrier();
566 CERROR("%s: already exists, won't add\n",
568 /* in case we found a free slot before duplicate */
569 minor_assign = false;
573 if (!minor_assign && obd == NULL) {
580 new_obd->obd_minor = new_obd_minor;
581 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
582 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
583 obd_devs[new_obd_minor] = new_obd;
587 CERROR("%s: all %u/%u devices used, increase "
588 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
589 i, class_devno_max(), ret);
592 write_unlock(&obd_dev_lock);
597 static int class_name2dev_nolock(const char *name)
604 for (i = 0; i < class_devno_max(); i++) {
605 struct obd_device *obd = class_num2obd(i);
607 if (obd && strcmp(name, obd->obd_name) == 0) {
608 /* Make sure we finished attaching before we give
609 out any references */
610 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
611 if (obd->obd_attached) {
621 int class_name2dev(const char *name)
628 read_lock(&obd_dev_lock);
629 i = class_name2dev_nolock(name);
630 read_unlock(&obd_dev_lock);
634 EXPORT_SYMBOL(class_name2dev);
636 struct obd_device *class_name2obd(const char *name)
638 int dev = class_name2dev(name);
640 if (dev < 0 || dev > class_devno_max())
642 return class_num2obd(dev);
644 EXPORT_SYMBOL(class_name2obd);
646 int class_uuid2dev_nolock(struct obd_uuid *uuid)
650 for (i = 0; i < class_devno_max(); i++) {
651 struct obd_device *obd = class_num2obd(i);
653 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
654 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
662 int class_uuid2dev(struct obd_uuid *uuid)
666 read_lock(&obd_dev_lock);
667 i = class_uuid2dev_nolock(uuid);
668 read_unlock(&obd_dev_lock);
672 EXPORT_SYMBOL(class_uuid2dev);
674 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
676 int dev = class_uuid2dev(uuid);
679 return class_num2obd(dev);
681 EXPORT_SYMBOL(class_uuid2obd);
684 * Get obd device from ::obd_devs[]
686 * \param num [in] array index
688 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
689 * otherwise return the obd device there.
691 struct obd_device *class_num2obd(int num)
693 struct obd_device *obd = NULL;
695 if (num < class_devno_max()) {
700 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
701 "%p obd_magic %08x != %08x\n",
702 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
703 LASSERTF(obd->obd_minor == num,
704 "%p obd_minor %0d != %0d\n",
705 obd, obd->obd_minor, num);
712 * Find obd in obd_dev[] by name or uuid.
714 * Increment obd's refcount if found.
716 * \param[in] str obd name or uuid
718 * \retval NULL if not found
719 * \retval target pointer to found obd_device
721 struct obd_device *class_dev_by_str(const char *str)
723 struct obd_device *target = NULL;
724 struct obd_uuid tgtuuid;
727 obd_str2uuid(&tgtuuid, str);
729 read_lock(&obd_dev_lock);
730 rc = class_uuid2dev_nolock(&tgtuuid);
732 rc = class_name2dev_nolock(str);
735 target = class_num2obd(rc);
738 class_incref(target, "find", current);
739 read_unlock(&obd_dev_lock);
743 EXPORT_SYMBOL(class_dev_by_str);
746 * Get obd devices count. Device in any
748 * \retval obd device count
750 int get_devices_count(void)
752 int index, max_index = class_devno_max(), dev_count = 0;
754 read_lock(&obd_dev_lock);
755 for (index = 0; index <= max_index; index++) {
756 struct obd_device *obd = class_num2obd(index);
760 read_unlock(&obd_dev_lock);
764 EXPORT_SYMBOL(get_devices_count);
766 void class_obd_list(void)
771 read_lock(&obd_dev_lock);
772 for (i = 0; i < class_devno_max(); i++) {
773 struct obd_device *obd = class_num2obd(i);
777 if (obd->obd_stopping)
779 else if (obd->obd_set_up)
781 else if (obd->obd_attached)
785 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
786 i, status, obd->obd_type->typ_name,
787 obd->obd_name, obd->obd_uuid.uuid,
788 atomic_read(&obd->obd_refcount));
790 read_unlock(&obd_dev_lock);
794 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
795 specified, then only the client with that uuid is returned,
796 otherwise any client connected to the tgt is returned. */
797 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
798 const char * typ_name,
799 struct obd_uuid *grp_uuid)
803 read_lock(&obd_dev_lock);
804 for (i = 0; i < class_devno_max(); i++) {
805 struct obd_device *obd = class_num2obd(i);
809 if ((strncmp(obd->obd_type->typ_name, typ_name,
810 strlen(typ_name)) == 0)) {
811 if (obd_uuid_equals(tgt_uuid,
812 &obd->u.cli.cl_target_uuid) &&
813 ((grp_uuid)? obd_uuid_equals(grp_uuid,
814 &obd->obd_uuid) : 1)) {
815 read_unlock(&obd_dev_lock);
820 read_unlock(&obd_dev_lock);
824 EXPORT_SYMBOL(class_find_client_obd);
826 /* Iterate the obd_device list looking devices have grp_uuid. Start
827 searching at *next, and if a device is found, the next index to look
828 at is saved in *next. If next is NULL, then the first matching device
829 will always be returned. */
830 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
836 else if (*next >= 0 && *next < class_devno_max())
841 read_lock(&obd_dev_lock);
842 for (; i < class_devno_max(); i++) {
843 struct obd_device *obd = class_num2obd(i);
847 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
850 read_unlock(&obd_dev_lock);
854 read_unlock(&obd_dev_lock);
858 EXPORT_SYMBOL(class_devices_in_group);
861 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
862 * adjust sptlrpc settings accordingly.
864 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
866 struct obd_device *obd;
870 LASSERT(namelen > 0);
872 read_lock(&obd_dev_lock);
873 for (i = 0; i < class_devno_max(); i++) {
874 obd = class_num2obd(i);
876 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
879 /* only notify mdc, osc, osp, lwp, mdt, ost
880 * because only these have a -sptlrpc llog */
881 type = obd->obd_type->typ_name;
882 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
883 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
884 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
885 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
886 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
887 strcmp(type, LUSTRE_OST_NAME) != 0)
890 if (strncmp(obd->obd_name, fsname, namelen))
893 class_incref(obd, __FUNCTION__, obd);
894 read_unlock(&obd_dev_lock);
895 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
896 sizeof(KEY_SPTLRPC_CONF),
897 KEY_SPTLRPC_CONF, 0, NULL, NULL);
899 class_decref(obd, __FUNCTION__, obd);
900 read_lock(&obd_dev_lock);
902 read_unlock(&obd_dev_lock);
905 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
907 void obd_cleanup_caches(void)
910 if (obd_device_cachep) {
911 kmem_cache_destroy(obd_device_cachep);
912 obd_device_cachep = NULL;
918 int obd_init_caches(void)
923 LASSERT(obd_device_cachep == NULL);
924 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
925 sizeof(struct obd_device),
927 if (!obd_device_cachep)
928 GOTO(out, rc = -ENOMEM);
932 obd_cleanup_caches();
936 /* map connection to client */
937 struct obd_export *class_conn2export(struct lustre_handle *conn)
939 struct obd_export *export;
943 CDEBUG(D_CACHE, "looking for null handle\n");
947 if (conn->cookie == -1) { /* this means assign a new connection */
948 CDEBUG(D_CACHE, "want a new connection\n");
952 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
953 export = class_handle2object(conn->cookie, NULL);
956 EXPORT_SYMBOL(class_conn2export);
958 struct obd_device *class_exp2obd(struct obd_export *exp)
964 EXPORT_SYMBOL(class_exp2obd);
966 struct obd_import *class_exp2cliimp(struct obd_export *exp)
968 struct obd_device *obd = exp->exp_obd;
971 return obd->u.cli.cl_import;
973 EXPORT_SYMBOL(class_exp2cliimp);
975 /* Export management functions */
976 static void class_export_destroy(struct obd_export *exp)
978 struct obd_device *obd = exp->exp_obd;
981 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
982 LASSERT(obd != NULL);
984 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
985 exp->exp_client_uuid.uuid, obd->obd_name);
987 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
988 if (exp->exp_connection)
989 ptlrpc_put_connection_superhack(exp->exp_connection);
991 LASSERT(list_empty(&exp->exp_outstanding_replies));
992 LASSERT(list_empty(&exp->exp_uncommitted_replies));
993 LASSERT(list_empty(&exp->exp_req_replay_queue));
994 LASSERT(list_empty(&exp->exp_hp_rpcs));
995 obd_destroy_export(exp);
996 /* self export doesn't hold a reference to an obd, although it
997 * exists until freeing of the obd */
998 if (exp != obd->obd_self_export)
999 class_decref(obd, "export", exp);
1001 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1005 static void export_handle_addref(void *export)
1007 class_export_get(export);
1010 static struct portals_handle_ops export_handle_ops = {
1011 .hop_addref = export_handle_addref,
1015 struct obd_export *class_export_get(struct obd_export *exp)
1017 atomic_inc(&exp->exp_refcount);
1018 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1019 atomic_read(&exp->exp_refcount));
1022 EXPORT_SYMBOL(class_export_get);
1024 void class_export_put(struct obd_export *exp)
1026 LASSERT(exp != NULL);
1027 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1028 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1029 atomic_read(&exp->exp_refcount) - 1);
1031 if (atomic_dec_and_test(&exp->exp_refcount)) {
1032 struct obd_device *obd = exp->exp_obd;
1034 CDEBUG(D_IOCTL, "final put %p/%s\n",
1035 exp, exp->exp_client_uuid.uuid);
1037 /* release nid stat refererence */
1038 lprocfs_exp_cleanup(exp);
1040 if (exp == obd->obd_self_export) {
1041 /* self export should be destroyed without
1042 * zombie thread as it doesn't hold a
1043 * reference to obd and doesn't hold any
1045 class_export_destroy(exp);
1046 /* self export is destroyed, no class
1047 * references exist and it is safe to free
1049 class_free_dev(obd);
1051 LASSERT(!list_empty(&exp->exp_obd_chain));
1052 obd_zombie_export_add(exp);
1057 EXPORT_SYMBOL(class_export_put);
1059 static void obd_zombie_exp_cull(struct work_struct *ws)
1061 struct obd_export *export;
1063 export = container_of(ws, struct obd_export, exp_zombie_work);
1064 class_export_destroy(export);
1067 /* Creates a new export, adds it to the hash table, and returns a
1068 * pointer to it. The refcount is 2: one for the hash reference, and
1069 * one for the pointer returned by this function. */
1070 struct obd_export *__class_new_export(struct obd_device *obd,
1071 struct obd_uuid *cluuid, bool is_self)
1073 struct obd_export *export;
1074 struct cfs_hash *hash = NULL;
1078 OBD_ALLOC_PTR(export);
1080 return ERR_PTR(-ENOMEM);
1082 export->exp_conn_cnt = 0;
1083 export->exp_lock_hash = NULL;
1084 export->exp_flock_hash = NULL;
1085 /* 2 = class_handle_hash + last */
1086 atomic_set(&export->exp_refcount, 2);
1087 atomic_set(&export->exp_rpc_count, 0);
1088 atomic_set(&export->exp_cb_count, 0);
1089 atomic_set(&export->exp_locks_count, 0);
1090 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1091 INIT_LIST_HEAD(&export->exp_locks_list);
1092 spin_lock_init(&export->exp_locks_list_guard);
1094 atomic_set(&export->exp_replay_count, 0);
1095 export->exp_obd = obd;
1096 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1097 spin_lock_init(&export->exp_uncommitted_replies_lock);
1098 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1099 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1100 INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1101 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1102 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1103 class_handle_hash(&export->exp_handle, &export_handle_ops);
1104 export->exp_last_request_time = ktime_get_real_seconds();
1105 spin_lock_init(&export->exp_lock);
1106 spin_lock_init(&export->exp_rpc_lock);
1107 INIT_HLIST_NODE(&export->exp_uuid_hash);
1108 INIT_HLIST_NODE(&export->exp_nid_hash);
1109 INIT_HLIST_NODE(&export->exp_gen_hash);
1110 spin_lock_init(&export->exp_bl_list_lock);
1111 INIT_LIST_HEAD(&export->exp_bl_list);
1112 INIT_LIST_HEAD(&export->exp_stale_list);
1113 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1115 export->exp_sp_peer = LUSTRE_SP_ANY;
1116 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1117 export->exp_client_uuid = *cluuid;
1118 obd_init_export(export);
1120 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1121 spin_lock(&obd->obd_dev_lock);
1122 /* shouldn't happen, but might race */
1123 if (obd->obd_stopping)
1124 GOTO(exit_unlock, rc = -ENODEV);
1126 hash = cfs_hash_getref(obd->obd_uuid_hash);
1128 GOTO(exit_unlock, rc = -ENODEV);
1129 spin_unlock(&obd->obd_dev_lock);
1131 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1133 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1134 obd->obd_name, cluuid->uuid, rc);
1135 GOTO(exit_err, rc = -EALREADY);
1139 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1140 spin_lock(&obd->obd_dev_lock);
1141 if (obd->obd_stopping) {
1143 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1144 GOTO(exit_unlock, rc = -ESHUTDOWN);
1148 class_incref(obd, "export", export);
1149 list_add_tail(&export->exp_obd_chain_timed,
1150 &obd->obd_exports_timed);
1151 list_add(&export->exp_obd_chain, &obd->obd_exports);
1152 obd->obd_num_exports++;
1154 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1155 INIT_LIST_HEAD(&export->exp_obd_chain);
1157 spin_unlock(&obd->obd_dev_lock);
1159 cfs_hash_putref(hash);
1163 spin_unlock(&obd->obd_dev_lock);
1166 cfs_hash_putref(hash);
1167 class_handle_unhash(&export->exp_handle);
1168 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1169 obd_destroy_export(export);
1170 OBD_FREE_PTR(export);
1174 struct obd_export *class_new_export(struct obd_device *obd,
1175 struct obd_uuid *uuid)
1177 return __class_new_export(obd, uuid, false);
1179 EXPORT_SYMBOL(class_new_export);
1181 struct obd_export *class_new_export_self(struct obd_device *obd,
1182 struct obd_uuid *uuid)
1184 return __class_new_export(obd, uuid, true);
1187 void class_unlink_export(struct obd_export *exp)
1189 class_handle_unhash(&exp->exp_handle);
1191 if (exp->exp_obd->obd_self_export == exp) {
1192 class_export_put(exp);
1196 spin_lock(&exp->exp_obd->obd_dev_lock);
1197 /* delete an uuid-export hashitem from hashtables */
1198 if (!hlist_unhashed(&exp->exp_uuid_hash))
1199 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1200 &exp->exp_client_uuid,
1201 &exp->exp_uuid_hash);
1203 #ifdef HAVE_SERVER_SUPPORT
1204 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1205 struct tg_export_data *ted = &exp->exp_target_data;
1206 struct cfs_hash *hash;
1208 /* Because obd_gen_hash will not be released until
1209 * class_cleanup(), so hash should never be NULL here */
1210 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1211 LASSERT(hash != NULL);
1212 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1213 &exp->exp_gen_hash);
1214 cfs_hash_putref(hash);
1216 #endif /* HAVE_SERVER_SUPPORT */
1218 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1219 list_del_init(&exp->exp_obd_chain_timed);
1220 exp->exp_obd->obd_num_exports--;
1221 spin_unlock(&exp->exp_obd->obd_dev_lock);
1222 atomic_inc(&obd_stale_export_num);
1224 /* A reference is kept by obd_stale_exports list */
1225 obd_stale_export_put(exp);
1227 EXPORT_SYMBOL(class_unlink_export);
1229 /* Import management functions */
1230 static void obd_zombie_import_free(struct obd_import *imp)
1234 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1235 imp->imp_obd->obd_name);
1237 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1239 ptlrpc_put_connection_superhack(imp->imp_connection);
1241 while (!list_empty(&imp->imp_conn_list)) {
1242 struct obd_import_conn *imp_conn;
1244 imp_conn = list_entry(imp->imp_conn_list.next,
1245 struct obd_import_conn, oic_item);
1246 list_del_init(&imp_conn->oic_item);
1247 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1248 OBD_FREE(imp_conn, sizeof(*imp_conn));
1251 LASSERT(imp->imp_sec == NULL);
1252 class_decref(imp->imp_obd, "import", imp);
1257 struct obd_import *class_import_get(struct obd_import *import)
1259 atomic_inc(&import->imp_refcount);
1260 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1261 atomic_read(&import->imp_refcount),
1262 import->imp_obd->obd_name);
1265 EXPORT_SYMBOL(class_import_get);
1267 void class_import_put(struct obd_import *imp)
1271 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1273 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1274 atomic_read(&imp->imp_refcount) - 1,
1275 imp->imp_obd->obd_name);
1277 if (atomic_dec_and_test(&imp->imp_refcount)) {
1278 CDEBUG(D_INFO, "final put import %p\n", imp);
1279 obd_zombie_import_add(imp);
1282 /* catch possible import put race */
1283 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1286 EXPORT_SYMBOL(class_import_put);
1288 static void init_imp_at(struct imp_at *at) {
1290 at_init(&at->iat_net_latency, 0, 0);
1291 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1292 /* max service estimates are tracked on the server side, so
1293 don't use the AT history here, just use the last reported
1294 val. (But keep hist for proc histogram, worst_ever) */
1295 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1300 static void obd_zombie_imp_cull(struct work_struct *ws)
1302 struct obd_import *import;
1304 import = container_of(ws, struct obd_import, imp_zombie_work);
1305 obd_zombie_import_free(import);
1308 struct obd_import *class_new_import(struct obd_device *obd)
1310 struct obd_import *imp;
1311 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1313 OBD_ALLOC(imp, sizeof(*imp));
1317 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1318 INIT_LIST_HEAD(&imp->imp_replay_list);
1319 INIT_LIST_HEAD(&imp->imp_sending_list);
1320 INIT_LIST_HEAD(&imp->imp_delayed_list);
1321 INIT_LIST_HEAD(&imp->imp_committed_list);
1322 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1323 imp->imp_known_replied_xid = 0;
1324 imp->imp_replay_cursor = &imp->imp_committed_list;
1325 spin_lock_init(&imp->imp_lock);
1326 imp->imp_last_success_conn = 0;
1327 imp->imp_state = LUSTRE_IMP_NEW;
1328 imp->imp_obd = class_incref(obd, "import", imp);
1329 rwlock_init(&imp->imp_sec_lock);
1330 init_waitqueue_head(&imp->imp_recovery_waitq);
1331 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1333 if (curr_pid_ns->child_reaper)
1334 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1336 imp->imp_sec_refpid = 1;
1338 atomic_set(&imp->imp_refcount, 2);
1339 atomic_set(&imp->imp_unregistering, 0);
1340 atomic_set(&imp->imp_inflight, 0);
1341 atomic_set(&imp->imp_replay_inflight, 0);
1342 atomic_set(&imp->imp_inval_count, 0);
1343 INIT_LIST_HEAD(&imp->imp_conn_list);
1344 init_imp_at(&imp->imp_at);
1346 /* the default magic is V2, will be used in connect RPC, and
1347 * then adjusted according to the flags in request/reply. */
1348 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1352 EXPORT_SYMBOL(class_new_import);
1354 void class_destroy_import(struct obd_import *import)
1356 LASSERT(import != NULL);
1357 LASSERT(import != LP_POISON);
1359 spin_lock(&import->imp_lock);
1360 import->imp_generation++;
1361 spin_unlock(&import->imp_lock);
1362 class_import_put(import);
1364 EXPORT_SYMBOL(class_destroy_import);
1366 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1368 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1370 spin_lock(&exp->exp_locks_list_guard);
1372 LASSERT(lock->l_exp_refs_nr >= 0);
1374 if (lock->l_exp_refs_target != NULL &&
1375 lock->l_exp_refs_target != exp) {
1376 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1377 exp, lock, lock->l_exp_refs_target);
1379 if ((lock->l_exp_refs_nr ++) == 0) {
1380 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1381 lock->l_exp_refs_target = exp;
1383 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1384 lock, exp, lock->l_exp_refs_nr);
1385 spin_unlock(&exp->exp_locks_list_guard);
1387 EXPORT_SYMBOL(__class_export_add_lock_ref);
1389 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1391 spin_lock(&exp->exp_locks_list_guard);
1392 LASSERT(lock->l_exp_refs_nr > 0);
1393 if (lock->l_exp_refs_target != exp) {
1394 LCONSOLE_WARN("lock %p, "
1395 "mismatching export pointers: %p, %p\n",
1396 lock, lock->l_exp_refs_target, exp);
1398 if (-- lock->l_exp_refs_nr == 0) {
1399 list_del_init(&lock->l_exp_refs_link);
1400 lock->l_exp_refs_target = NULL;
1402 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1403 lock, exp, lock->l_exp_refs_nr);
1404 spin_unlock(&exp->exp_locks_list_guard);
1406 EXPORT_SYMBOL(__class_export_del_lock_ref);
1409 /* A connection defines an export context in which preallocation can
1410 be managed. This releases the export pointer reference, and returns
1411 the export handle, so the export refcount is 1 when this function
1413 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1414 struct obd_uuid *cluuid)
1416 struct obd_export *export;
1417 LASSERT(conn != NULL);
1418 LASSERT(obd != NULL);
1419 LASSERT(cluuid != NULL);
1422 export = class_new_export(obd, cluuid);
1424 RETURN(PTR_ERR(export));
1426 conn->cookie = export->exp_handle.h_cookie;
1427 class_export_put(export);
1429 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1430 cluuid->uuid, conn->cookie);
1433 EXPORT_SYMBOL(class_connect);
1435 /* if export is involved in recovery then clean up related things */
1436 static void class_export_recovery_cleanup(struct obd_export *exp)
1438 struct obd_device *obd = exp->exp_obd;
1440 spin_lock(&obd->obd_recovery_task_lock);
1441 if (obd->obd_recovering) {
1442 if (exp->exp_in_recovery) {
1443 spin_lock(&exp->exp_lock);
1444 exp->exp_in_recovery = 0;
1445 spin_unlock(&exp->exp_lock);
1446 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1447 atomic_dec(&obd->obd_connected_clients);
1450 /* if called during recovery then should update
1451 * obd_stale_clients counter,
1452 * lightweight exports are not counted */
1453 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1454 exp->exp_obd->obd_stale_clients++;
1456 spin_unlock(&obd->obd_recovery_task_lock);
1458 spin_lock(&exp->exp_lock);
1459 /** Cleanup req replay fields */
1460 if (exp->exp_req_replay_needed) {
1461 exp->exp_req_replay_needed = 0;
1463 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1464 atomic_dec(&obd->obd_req_replay_clients);
1467 /** Cleanup lock replay data */
1468 if (exp->exp_lock_replay_needed) {
1469 exp->exp_lock_replay_needed = 0;
1471 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1472 atomic_dec(&obd->obd_lock_replay_clients);
1474 spin_unlock(&exp->exp_lock);
1477 /* This function removes 1-3 references from the export:
1478 * 1 - for export pointer passed
1479 * and if disconnect really need
1480 * 2 - removing from hash
1481 * 3 - in client_unlink_export
1482 * The export pointer passed to this function can destroyed */
1483 int class_disconnect(struct obd_export *export)
1485 int already_disconnected;
1488 if (export == NULL) {
1489 CWARN("attempting to free NULL export %p\n", export);
1493 spin_lock(&export->exp_lock);
1494 already_disconnected = export->exp_disconnected;
1495 export->exp_disconnected = 1;
1496 /* We hold references of export for uuid hash
1497 * and nid_hash and export link at least. So
1498 * it is safe to call cfs_hash_del in there. */
1499 if (!hlist_unhashed(&export->exp_nid_hash))
1500 cfs_hash_del(export->exp_obd->obd_nid_hash,
1501 &export->exp_connection->c_peer.nid,
1502 &export->exp_nid_hash);
1503 spin_unlock(&export->exp_lock);
1505 /* class_cleanup(), abort_recovery(), and class_fail_export()
1506 * all end up in here, and if any of them race we shouldn't
1507 * call extra class_export_puts(). */
1508 if (already_disconnected) {
1509 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1510 GOTO(no_disconn, already_disconnected);
1513 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1514 export->exp_handle.h_cookie);
1516 class_export_recovery_cleanup(export);
1517 class_unlink_export(export);
1519 class_export_put(export);
1522 EXPORT_SYMBOL(class_disconnect);
1524 /* Return non-zero for a fully connected export */
1525 int class_connected_export(struct obd_export *exp)
1530 spin_lock(&exp->exp_lock);
1531 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1532 spin_unlock(&exp->exp_lock);
1536 EXPORT_SYMBOL(class_connected_export);
1538 static void class_disconnect_export_list(struct list_head *list,
1539 enum obd_option flags)
1542 struct obd_export *exp;
1545 /* It's possible that an export may disconnect itself, but
1546 * nothing else will be added to this list. */
1547 while (!list_empty(list)) {
1548 exp = list_entry(list->next, struct obd_export,
1550 /* need for safe call CDEBUG after obd_disconnect */
1551 class_export_get(exp);
1553 spin_lock(&exp->exp_lock);
1554 exp->exp_flags = flags;
1555 spin_unlock(&exp->exp_lock);
1557 if (obd_uuid_equals(&exp->exp_client_uuid,
1558 &exp->exp_obd->obd_uuid)) {
1560 "exp %p export uuid == obd uuid, don't discon\n",
1562 /* Need to delete this now so we don't end up pointing
1563 * to work_list later when this export is cleaned up. */
1564 list_del_init(&exp->exp_obd_chain);
1565 class_export_put(exp);
1569 class_export_get(exp);
1570 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1571 "last request at %lld\n",
1572 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1573 exp, exp->exp_last_request_time);
1574 /* release one export reference anyway */
1575 rc = obd_disconnect(exp);
1577 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1578 obd_export_nid2str(exp), exp, rc);
1579 class_export_put(exp);
1584 void class_disconnect_exports(struct obd_device *obd)
1586 struct list_head work_list;
1589 /* Move all of the exports from obd_exports to a work list, en masse. */
1590 INIT_LIST_HEAD(&work_list);
1591 spin_lock(&obd->obd_dev_lock);
1592 list_splice_init(&obd->obd_exports, &work_list);
1593 list_splice_init(&obd->obd_delayed_exports, &work_list);
1594 spin_unlock(&obd->obd_dev_lock);
1596 if (!list_empty(&work_list)) {
1597 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1598 "disconnecting them\n", obd->obd_minor, obd);
1599 class_disconnect_export_list(&work_list,
1600 exp_flags_from_obd(obd));
1602 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1603 obd->obd_minor, obd);
1606 EXPORT_SYMBOL(class_disconnect_exports);
1608 /* Remove exports that have not completed recovery.
1610 void class_disconnect_stale_exports(struct obd_device *obd,
1611 int (*test_export)(struct obd_export *))
1613 struct list_head work_list;
1614 struct obd_export *exp, *n;
1618 INIT_LIST_HEAD(&work_list);
1619 spin_lock(&obd->obd_dev_lock);
1620 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1622 /* don't count self-export as client */
1623 if (obd_uuid_equals(&exp->exp_client_uuid,
1624 &exp->exp_obd->obd_uuid))
1627 /* don't evict clients which have no slot in last_rcvd
1628 * (e.g. lightweight connection) */
1629 if (exp->exp_target_data.ted_lr_idx == -1)
1632 spin_lock(&exp->exp_lock);
1633 if (exp->exp_failed || test_export(exp)) {
1634 spin_unlock(&exp->exp_lock);
1637 exp->exp_failed = 1;
1638 spin_unlock(&exp->exp_lock);
1640 list_move(&exp->exp_obd_chain, &work_list);
1642 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1643 obd->obd_name, exp->exp_client_uuid.uuid,
1644 obd_export_nid2str(exp));
1645 print_export_data(exp, "EVICTING", 0, D_HA);
1647 spin_unlock(&obd->obd_dev_lock);
1650 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1651 obd->obd_name, evicted);
1653 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1654 OBD_OPT_ABORT_RECOV);
1657 EXPORT_SYMBOL(class_disconnect_stale_exports);
1659 void class_fail_export(struct obd_export *exp)
1661 int rc, already_failed;
1663 spin_lock(&exp->exp_lock);
1664 already_failed = exp->exp_failed;
1665 exp->exp_failed = 1;
1666 spin_unlock(&exp->exp_lock);
1668 if (already_failed) {
1669 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1670 exp, exp->exp_client_uuid.uuid);
1674 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1675 exp, exp->exp_client_uuid.uuid);
1677 if (obd_dump_on_timeout)
1678 libcfs_debug_dumplog();
1680 /* need for safe call CDEBUG after obd_disconnect */
1681 class_export_get(exp);
1683 /* Most callers into obd_disconnect are removing their own reference
1684 * (request, for example) in addition to the one from the hash table.
1685 * We don't have such a reference here, so make one. */
1686 class_export_get(exp);
1687 rc = obd_disconnect(exp);
1689 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1691 CDEBUG(D_HA, "disconnected export %p/%s\n",
1692 exp, exp->exp_client_uuid.uuid);
1693 class_export_put(exp);
1695 EXPORT_SYMBOL(class_fail_export);
1697 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1699 struct cfs_hash *nid_hash;
1700 struct obd_export *doomed_exp = NULL;
1701 int exports_evicted = 0;
1703 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1705 spin_lock(&obd->obd_dev_lock);
1706 /* umount has run already, so evict thread should leave
1707 * its task to umount thread now */
1708 if (obd->obd_stopping) {
1709 spin_unlock(&obd->obd_dev_lock);
1710 return exports_evicted;
1712 nid_hash = obd->obd_nid_hash;
1713 cfs_hash_getref(nid_hash);
1714 spin_unlock(&obd->obd_dev_lock);
1717 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1718 if (doomed_exp == NULL)
1721 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1722 "nid %s found, wanted nid %s, requested nid %s\n",
1723 obd_export_nid2str(doomed_exp),
1724 libcfs_nid2str(nid_key), nid);
1725 LASSERTF(doomed_exp != obd->obd_self_export,
1726 "self-export is hashed by NID?\n");
1728 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1729 "request\n", obd->obd_name,
1730 obd_uuid2str(&doomed_exp->exp_client_uuid),
1731 obd_export_nid2str(doomed_exp));
1732 class_fail_export(doomed_exp);
1733 class_export_put(doomed_exp);
1736 cfs_hash_putref(nid_hash);
1738 if (!exports_evicted)
1739 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1740 obd->obd_name, nid);
1741 return exports_evicted;
1743 EXPORT_SYMBOL(obd_export_evict_by_nid);
1745 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1747 struct cfs_hash *uuid_hash;
1748 struct obd_export *doomed_exp = NULL;
1749 struct obd_uuid doomed_uuid;
1750 int exports_evicted = 0;
1752 spin_lock(&obd->obd_dev_lock);
1753 if (obd->obd_stopping) {
1754 spin_unlock(&obd->obd_dev_lock);
1755 return exports_evicted;
1757 uuid_hash = obd->obd_uuid_hash;
1758 cfs_hash_getref(uuid_hash);
1759 spin_unlock(&obd->obd_dev_lock);
1761 obd_str2uuid(&doomed_uuid, uuid);
1762 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1763 CERROR("%s: can't evict myself\n", obd->obd_name);
1764 cfs_hash_putref(uuid_hash);
1765 return exports_evicted;
1768 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1770 if (doomed_exp == NULL) {
1771 CERROR("%s: can't disconnect %s: no exports found\n",
1772 obd->obd_name, uuid);
1774 CWARN("%s: evicting %s at adminstrative request\n",
1775 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1776 class_fail_export(doomed_exp);
1777 class_export_put(doomed_exp);
1780 cfs_hash_putref(uuid_hash);
1782 return exports_evicted;
1785 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1786 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1787 EXPORT_SYMBOL(class_export_dump_hook);
1790 static void print_export_data(struct obd_export *exp, const char *status,
1791 int locks, int debug_level)
1793 struct ptlrpc_reply_state *rs;
1794 struct ptlrpc_reply_state *first_reply = NULL;
1797 spin_lock(&exp->exp_lock);
1798 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1804 spin_unlock(&exp->exp_lock);
1806 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1807 "%p %s %llu stale:%d\n",
1808 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1809 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1810 atomic_read(&exp->exp_rpc_count),
1811 atomic_read(&exp->exp_cb_count),
1812 atomic_read(&exp->exp_locks_count),
1813 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1814 nreplies, first_reply, nreplies > 3 ? "..." : "",
1815 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1816 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1817 if (locks && class_export_dump_hook != NULL)
1818 class_export_dump_hook(exp);
1822 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1824 struct obd_export *exp;
1826 spin_lock(&obd->obd_dev_lock);
1827 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1828 print_export_data(exp, "ACTIVE", locks, debug_level);
1829 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1830 print_export_data(exp, "UNLINKED", locks, debug_level);
1831 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1832 print_export_data(exp, "DELAYED", locks, debug_level);
1833 spin_unlock(&obd->obd_dev_lock);
1836 void obd_exports_barrier(struct obd_device *obd)
1839 LASSERT(list_empty(&obd->obd_exports));
1840 spin_lock(&obd->obd_dev_lock);
1841 while (!list_empty(&obd->obd_unlinked_exports)) {
1842 spin_unlock(&obd->obd_dev_lock);
1843 set_current_state(TASK_UNINTERRUPTIBLE);
1844 schedule_timeout(cfs_time_seconds(waited));
1845 if (waited > 5 && is_power_of_2(waited)) {
1846 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1847 "more than %d seconds. "
1848 "The obd refcount = %d. Is it stuck?\n",
1849 obd->obd_name, waited,
1850 atomic_read(&obd->obd_refcount));
1851 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1854 spin_lock(&obd->obd_dev_lock);
1856 spin_unlock(&obd->obd_dev_lock);
1858 EXPORT_SYMBOL(obd_exports_barrier);
1861 * Add export to the obd_zombe thread and notify it.
1863 static void obd_zombie_export_add(struct obd_export *exp) {
1864 atomic_dec(&obd_stale_export_num);
1865 spin_lock(&exp->exp_obd->obd_dev_lock);
1866 LASSERT(!list_empty(&exp->exp_obd_chain));
1867 list_del_init(&exp->exp_obd_chain);
1868 spin_unlock(&exp->exp_obd->obd_dev_lock);
1870 queue_work(zombie_wq, &exp->exp_zombie_work);
1874 * Add import to the obd_zombe thread and notify it.
1876 static void obd_zombie_import_add(struct obd_import *imp) {
1877 LASSERT(imp->imp_sec == NULL);
1879 queue_work(zombie_wq, &imp->imp_zombie_work);
1883 * wait when obd_zombie import/export queues become empty
1885 void obd_zombie_barrier(void)
1887 flush_workqueue(zombie_wq);
1889 EXPORT_SYMBOL(obd_zombie_barrier);
1892 struct obd_export *obd_stale_export_get(void)
1894 struct obd_export *exp = NULL;
1897 spin_lock(&obd_stale_export_lock);
1898 if (!list_empty(&obd_stale_exports)) {
1899 exp = list_entry(obd_stale_exports.next,
1900 struct obd_export, exp_stale_list);
1901 list_del_init(&exp->exp_stale_list);
1903 spin_unlock(&obd_stale_export_lock);
1906 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1907 atomic_read(&obd_stale_export_num));
1911 EXPORT_SYMBOL(obd_stale_export_get);
1913 void obd_stale_export_put(struct obd_export *exp)
1917 LASSERT(list_empty(&exp->exp_stale_list));
1918 if (exp->exp_lock_hash &&
1919 atomic_read(&exp->exp_lock_hash->hs_count)) {
1920 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1921 atomic_read(&obd_stale_export_num));
1923 spin_lock_bh(&exp->exp_bl_list_lock);
1924 spin_lock(&obd_stale_export_lock);
1925 /* Add to the tail if there is no blocked locks,
1926 * to the head otherwise. */
1927 if (list_empty(&exp->exp_bl_list))
1928 list_add_tail(&exp->exp_stale_list,
1929 &obd_stale_exports);
1931 list_add(&exp->exp_stale_list,
1932 &obd_stale_exports);
1934 spin_unlock(&obd_stale_export_lock);
1935 spin_unlock_bh(&exp->exp_bl_list_lock);
1937 class_export_put(exp);
1941 EXPORT_SYMBOL(obd_stale_export_put);
1944 * Adjust the position of the export in the stale list,
1945 * i.e. move to the head of the list if is needed.
1947 void obd_stale_export_adjust(struct obd_export *exp)
1949 LASSERT(exp != NULL);
1950 spin_lock_bh(&exp->exp_bl_list_lock);
1951 spin_lock(&obd_stale_export_lock);
1953 if (!list_empty(&exp->exp_stale_list) &&
1954 !list_empty(&exp->exp_bl_list))
1955 list_move(&exp->exp_stale_list, &obd_stale_exports);
1957 spin_unlock(&obd_stale_export_lock);
1958 spin_unlock_bh(&exp->exp_bl_list_lock);
1960 EXPORT_SYMBOL(obd_stale_export_adjust);
1963 * start destroy zombie import/export thread
1965 int obd_zombie_impexp_init(void)
1967 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1975 * stop destroy zombie import/export thread
1977 void obd_zombie_impexp_stop(void)
1979 destroy_workqueue(zombie_wq);
1980 LASSERT(list_empty(&obd_stale_exports));
1983 /***** Kernel-userspace comm helpers *******/
1985 /* Get length of entire message, including header */
1986 int kuc_len(int payload_len)
1988 return sizeof(struct kuc_hdr) + payload_len;
1990 EXPORT_SYMBOL(kuc_len);
1992 /* Get a pointer to kuc header, given a ptr to the payload
1993 * @param p Pointer to payload area
1994 * @returns Pointer to kuc header
1996 struct kuc_hdr * kuc_ptr(void *p)
1998 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1999 LASSERT(lh->kuc_magic == KUC_MAGIC);
2002 EXPORT_SYMBOL(kuc_ptr);
2004 /* Alloc space for a message, and fill in header
2005 * @return Pointer to payload area
2007 void *kuc_alloc(int payload_len, int transport, int type)
2010 int len = kuc_len(payload_len);
2014 return ERR_PTR(-ENOMEM);
2016 lh->kuc_magic = KUC_MAGIC;
2017 lh->kuc_transport = transport;
2018 lh->kuc_msgtype = type;
2019 lh->kuc_msglen = len;
2021 return (void *)(lh + 1);
2023 EXPORT_SYMBOL(kuc_alloc);
2025 /* Takes pointer to payload area */
2026 void kuc_free(void *p, int payload_len)
2028 struct kuc_hdr *lh = kuc_ptr(p);
2029 OBD_FREE(lh, kuc_len(payload_len));
2031 EXPORT_SYMBOL(kuc_free);
2033 struct obd_request_slot_waiter {
2034 struct list_head orsw_entry;
2035 wait_queue_head_t orsw_waitq;
2039 static bool obd_request_slot_avail(struct client_obd *cli,
2040 struct obd_request_slot_waiter *orsw)
2044 spin_lock(&cli->cl_loi_list_lock);
2045 avail = !!list_empty(&orsw->orsw_entry);
2046 spin_unlock(&cli->cl_loi_list_lock);
2052 * For network flow control, the RPC sponsor needs to acquire a credit
2053 * before sending the RPC. The credits count for a connection is defined
2054 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2055 * the subsequent RPC sponsors need to wait until others released their
2056 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2058 int obd_get_request_slot(struct client_obd *cli)
2060 struct obd_request_slot_waiter orsw;
2061 struct l_wait_info lwi;
2064 spin_lock(&cli->cl_loi_list_lock);
2065 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2066 cli->cl_rpcs_in_flight++;
2067 spin_unlock(&cli->cl_loi_list_lock);
2071 init_waitqueue_head(&orsw.orsw_waitq);
2072 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2073 orsw.orsw_signaled = false;
2074 spin_unlock(&cli->cl_loi_list_lock);
2076 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2077 rc = l_wait_event(orsw.orsw_waitq,
2078 obd_request_slot_avail(cli, &orsw) ||
2082 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2083 * freed but other (such as obd_put_request_slot) is using it. */
2084 spin_lock(&cli->cl_loi_list_lock);
2086 if (!orsw.orsw_signaled) {
2087 if (list_empty(&orsw.orsw_entry))
2088 cli->cl_rpcs_in_flight--;
2090 list_del(&orsw.orsw_entry);
2094 if (orsw.orsw_signaled) {
2095 LASSERT(list_empty(&orsw.orsw_entry));
2099 spin_unlock(&cli->cl_loi_list_lock);
2103 EXPORT_SYMBOL(obd_get_request_slot);
2105 void obd_put_request_slot(struct client_obd *cli)
2107 struct obd_request_slot_waiter *orsw;
2109 spin_lock(&cli->cl_loi_list_lock);
2110 cli->cl_rpcs_in_flight--;
2112 /* If there is free slot, wakeup the first waiter. */
2113 if (!list_empty(&cli->cl_flight_waiters) &&
2114 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2115 orsw = list_entry(cli->cl_flight_waiters.next,
2116 struct obd_request_slot_waiter, orsw_entry);
2117 list_del_init(&orsw->orsw_entry);
2118 cli->cl_rpcs_in_flight++;
2119 wake_up(&orsw->orsw_waitq);
2121 spin_unlock(&cli->cl_loi_list_lock);
2123 EXPORT_SYMBOL(obd_put_request_slot);
2125 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2127 return cli->cl_max_rpcs_in_flight;
2129 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2131 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2133 struct obd_request_slot_waiter *orsw;
2140 if (max > OBD_MAX_RIF_MAX || max < 1)
2143 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2144 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2145 /* adjust max_mod_rpcs_in_flight to ensure it is always
2146 * strictly lower that max_rpcs_in_flight */
2148 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2149 "because it must be higher than "
2150 "max_mod_rpcs_in_flight value",
2151 cli->cl_import->imp_obd->obd_name);
2154 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2155 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2161 spin_lock(&cli->cl_loi_list_lock);
2162 old = cli->cl_max_rpcs_in_flight;
2163 cli->cl_max_rpcs_in_flight = max;
2164 client_adjust_max_dirty(cli);
2168 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2169 for (i = 0; i < diff; i++) {
2170 if (list_empty(&cli->cl_flight_waiters))
2173 orsw = list_entry(cli->cl_flight_waiters.next,
2174 struct obd_request_slot_waiter, orsw_entry);
2175 list_del_init(&orsw->orsw_entry);
2176 cli->cl_rpcs_in_flight++;
2177 wake_up(&orsw->orsw_waitq);
2179 spin_unlock(&cli->cl_loi_list_lock);
2183 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2185 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2187 return cli->cl_max_mod_rpcs_in_flight;
2189 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2191 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2193 struct obd_connect_data *ocd;
2197 if (max > OBD_MAX_RIF_MAX || max < 1)
2200 /* cannot exceed or equal max_rpcs_in_flight */
2201 if (max >= cli->cl_max_rpcs_in_flight) {
2202 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2203 "higher or equal to max_rpcs_in_flight value (%u)\n",
2204 cli->cl_import->imp_obd->obd_name,
2205 max, cli->cl_max_rpcs_in_flight);
2209 /* cannot exceed max modify RPCs in flight supported by the server */
2210 ocd = &cli->cl_import->imp_connect_data;
2211 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2212 maxmodrpcs = ocd->ocd_maxmodrpcs;
2215 if (max > maxmodrpcs) {
2216 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2217 "higher than max_mod_rpcs_per_client value (%hu) "
2218 "returned by the server at connection\n",
2219 cli->cl_import->imp_obd->obd_name,
2224 spin_lock(&cli->cl_mod_rpcs_lock);
2226 prev = cli->cl_max_mod_rpcs_in_flight;
2227 cli->cl_max_mod_rpcs_in_flight = max;
2229 /* wakeup waiters if limit has been increased */
2230 if (cli->cl_max_mod_rpcs_in_flight > prev)
2231 wake_up(&cli->cl_mod_rpcs_waitq);
2233 spin_unlock(&cli->cl_mod_rpcs_lock);
2237 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2239 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2240 struct seq_file *seq)
2242 unsigned long mod_tot = 0, mod_cum;
2243 struct timespec64 now;
2246 ktime_get_real_ts64(&now);
2248 spin_lock(&cli->cl_mod_rpcs_lock);
2250 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2251 (s64)now.tv_sec, now.tv_nsec);
2252 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2253 cli->cl_mod_rpcs_in_flight);
2255 seq_printf(seq, "\n\t\t\tmodify\n");
2256 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2258 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2261 for (i = 0; i < OBD_HIST_MAX; i++) {
2262 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2264 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2265 i, mod, pct(mod, mod_tot),
2266 pct(mod_cum, mod_tot));
2267 if (mod_cum == mod_tot)
2271 spin_unlock(&cli->cl_mod_rpcs_lock);
2275 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2277 /* The number of modify RPCs sent in parallel is limited
2278 * because the server has a finite number of slots per client to
2279 * store request result and ensure reply reconstruction when needed.
2280 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2281 * that takes into account server limit and cl_max_rpcs_in_flight
2283 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2284 * one close request is allowed above the maximum.
2286 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2291 /* A slot is available if
2292 * - number of modify RPCs in flight is less than the max
2293 * - it's a close RPC and no other close request is in flight
2295 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2296 (close_req && cli->cl_close_rpcs_in_flight == 0);
2301 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2306 spin_lock(&cli->cl_mod_rpcs_lock);
2307 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2308 spin_unlock(&cli->cl_mod_rpcs_lock);
2312 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2315 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2316 it->it_op == IT_READDIR ||
2317 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2322 /* Get a modify RPC slot from the obd client @cli according
2323 * to the kind of operation @opc that is going to be sent
2324 * and the intent @it of the operation if it applies.
2325 * If the maximum number of modify RPCs in flight is reached
2326 * the thread is put to sleep.
2327 * Returns the tag to be set in the request message. Tag 0
2328 * is reserved for non-modifying requests.
2330 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2331 struct lookup_intent *it)
2333 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2334 bool close_req = false;
2337 /* read-only metadata RPCs don't consume a slot on MDT
2338 * for reply reconstruction
2340 if (obd_skip_mod_rpc_slot(it))
2343 if (opc == MDS_CLOSE)
2347 spin_lock(&cli->cl_mod_rpcs_lock);
2348 max = cli->cl_max_mod_rpcs_in_flight;
2349 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2350 /* there is a slot available */
2351 cli->cl_mod_rpcs_in_flight++;
2353 cli->cl_close_rpcs_in_flight++;
2354 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2355 cli->cl_mod_rpcs_in_flight);
2356 /* find a free tag */
2357 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2359 LASSERT(i < OBD_MAX_RIF_MAX);
2360 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2361 spin_unlock(&cli->cl_mod_rpcs_lock);
2362 /* tag 0 is reserved for non-modify RPCs */
2365 spin_unlock(&cli->cl_mod_rpcs_lock);
2367 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2368 "opc %u, max %hu\n",
2369 cli->cl_import->imp_obd->obd_name, opc, max);
2371 l_wait_event_exclusive(cli->cl_mod_rpcs_waitq,
2372 obd_mod_rpc_slot_avail(cli, close_req),
2376 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2378 /* Put a modify RPC slot from the obd client @cli according
2379 * to the kind of operation @opc that has been sent and the
2380 * intent @it of the operation if it applies.
2382 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2383 struct lookup_intent *it, __u16 tag)
2385 bool close_req = false;
2387 if (obd_skip_mod_rpc_slot(it))
2390 if (opc == MDS_CLOSE)
2393 spin_lock(&cli->cl_mod_rpcs_lock);
2394 cli->cl_mod_rpcs_in_flight--;
2396 cli->cl_close_rpcs_in_flight--;
2397 /* release the tag in the bitmap */
2398 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2399 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2400 spin_unlock(&cli->cl_mod_rpcs_lock);
2401 wake_up(&cli->cl_mod_rpcs_waitq);
2403 EXPORT_SYMBOL(obd_put_mod_rpc_slot);