4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
54 static struct kmem_cache *obd_device_cachep;
56 static struct workqueue_struct *zombie_wq;
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61 const char *status, int locks, int debug_level);
63 static LIST_HEAD(obd_stale_exports);
64 static DEFINE_SPINLOCK(obd_stale_export_lock);
65 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
101 struct list_head *tmp;
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 list_for_each(tmp, &obd_types) {
106 type = list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
152 void class_put_type(struct obd_type *type)
155 spin_lock(&type->obd_type_lock);
157 module_put(type->typ_dt_ops->o_owner);
158 spin_unlock(&type->obd_type_lock);
161 static void class_sysfs_release(struct kobject *kobj)
163 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
165 OBD_FREE(type, sizeof(*type));
168 static struct kobj_type class_ktype = {
169 .sysfs_ops = &lustre_sysfs_ops,
170 .release = class_sysfs_release,
173 #ifdef HAVE_SERVER_SUPPORT
174 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
176 struct dentry *symlink;
177 struct obd_type *type;
178 struct kobject *kobj;
181 kobj = kset_find_obj(lustre_kset, name);
184 return ERR_PTR(-EEXIST);
187 OBD_ALLOC(type, sizeof(*type));
189 return ERR_PTR(-ENOMEM);
191 type->typ_kobj.kset = lustre_kset;
192 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
193 &lustre_kset->kobj, "%s", name);
197 symlink = debugfs_create_dir(name, debugfs_lustre_root);
198 if (IS_ERR_OR_NULL(symlink)) {
199 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
200 kobject_put(&type->typ_kobj);
203 type->typ_debugfs_entry = symlink;
204 type->typ_sym_filter = true;
207 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
209 if (IS_ERR(type->typ_procroot)) {
210 CERROR("%s: can't create compat proc entry: %d\n",
211 name, (int)PTR_ERR(type->typ_procroot));
212 type->typ_procroot = NULL;
218 EXPORT_SYMBOL(class_add_symlinks);
219 #endif /* HAVE_SERVER_SUPPORT */
221 #define CLASS_MAX_NAME 1024
223 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
224 bool enable_proc, struct lprocfs_vars *vars,
225 const char *name, struct lu_device_type *ldt)
227 struct obd_type *type;
228 #ifdef HAVE_SERVER_SUPPORT
229 struct kobject *kobj;
230 #endif /* HAVE_SERVER_SUPPORT */
235 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
237 if (class_search_type(name)) {
238 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
242 #ifdef HAVE_SERVER_SUPPORT
243 kobj = kset_find_obj(lustre_kset, name);
245 type = container_of(kobj, struct obd_type, typ_kobj);
249 #endif /* HAVE_SERVER_SUPPORT */
251 OBD_ALLOC(type, sizeof(*type));
255 type->typ_kobj.kset = lustre_kset;
256 kobject_init(&type->typ_kobj, &class_ktype);
257 #ifdef HAVE_SERVER_SUPPORT
259 #endif /* HAVE_SERVER_SUPPORT */
260 OBD_ALLOC_PTR(type->typ_dt_ops);
261 OBD_ALLOC_PTR(type->typ_md_ops);
262 OBD_ALLOC(type->typ_name, strlen(name) + 1);
264 if (type->typ_dt_ops == NULL ||
265 type->typ_md_ops == NULL ||
266 type->typ_name == NULL)
267 GOTO (failed, rc = -ENOMEM);
269 *(type->typ_dt_ops) = *dt_ops;
270 /* md_ops is optional */
272 *(type->typ_md_ops) = *md_ops;
273 strcpy(type->typ_name, name);
274 spin_lock_init(&type->obd_type_lock);
276 #ifdef HAVE_SERVER_SUPPORT
277 if (type->typ_sym_filter)
280 #ifdef CONFIG_PROC_FS
281 if (enable_proc && !type->typ_procroot) {
282 type->typ_procroot = lprocfs_register(type->typ_name,
285 if (IS_ERR(type->typ_procroot)) {
286 rc = PTR_ERR(type->typ_procroot);
287 type->typ_procroot = NULL;
292 type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
294 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
295 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
297 type->typ_debugfs_entry = NULL;
301 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
304 #ifdef HAVE_SERVER_SUPPORT
309 rc = lu_device_type_init(ldt);
314 spin_lock(&obd_types_lock);
315 list_add(&type->typ_chain, &obd_types);
316 spin_unlock(&obd_types_lock);
321 #ifdef HAVE_SERVER_SUPPORT
322 if (type->typ_sym_filter)
323 type->typ_debugfs_entry = NULL;
325 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
326 ldebugfs_remove(&type->typ_debugfs_entry);
327 if (type->typ_name != NULL) {
328 #ifdef CONFIG_PROC_FS
329 if (type->typ_procroot != NULL)
330 remove_proc_subtree(type->typ_name, proc_lustre_root);
332 OBD_FREE(type->typ_name, strlen(name) + 1);
334 if (type->typ_md_ops != NULL)
335 OBD_FREE_PTR(type->typ_md_ops);
336 if (type->typ_dt_ops != NULL)
337 OBD_FREE_PTR(type->typ_dt_ops);
338 kobject_put(&type->typ_kobj);
342 EXPORT_SYMBOL(class_register_type);
344 int class_unregister_type(const char *name)
346 struct obd_type *type = class_search_type(name);
350 CERROR("unknown obd type\n");
354 if (type->typ_refcnt) {
355 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
356 /* This is a bad situation, let's make the best of it */
357 /* Remove ops, but leave the name for debugging */
358 OBD_FREE_PTR(type->typ_dt_ops);
359 OBD_FREE_PTR(type->typ_md_ops);
363 /* we do not use type->typ_procroot as for compatibility purposes
364 * other modules can share names (i.e. lod can use lov entry). so
365 * we can't reference pointer as it can get invalided when another
366 * module removes the entry */
367 #ifdef CONFIG_PROC_FS
368 if (type->typ_procroot != NULL)
369 remove_proc_subtree(type->typ_name, proc_lustre_root);
371 #ifdef HAVE_SERVER_SUPPORT
372 if (type->typ_sym_filter)
373 type->typ_debugfs_entry = NULL;
375 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
376 ldebugfs_remove(&type->typ_debugfs_entry);
379 lu_device_type_fini(type->typ_lu);
381 spin_lock(&obd_types_lock);
382 list_del(&type->typ_chain);
383 spin_unlock(&obd_types_lock);
384 OBD_FREE(type->typ_name, strlen(name) + 1);
385 if (type->typ_dt_ops != NULL)
386 OBD_FREE_PTR(type->typ_dt_ops);
387 if (type->typ_md_ops != NULL)
388 OBD_FREE_PTR(type->typ_md_ops);
389 kobject_put(&type->typ_kobj);
392 } /* class_unregister_type */
393 EXPORT_SYMBOL(class_unregister_type);
396 * Create a new obd device.
398 * Allocate the new obd_device and initialize it.
400 * \param[in] type_name obd device type string.
401 * \param[in] name obd device name.
402 * \param[in] uuid obd device UUID
404 * \retval newdev pointer to created obd_device
405 * \retval ERR_PTR(errno) on error
407 struct obd_device *class_newdev(const char *type_name, const char *name,
410 struct obd_device *newdev;
411 struct obd_type *type = NULL;
414 if (strlen(name) >= MAX_OBD_NAME) {
415 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
416 RETURN(ERR_PTR(-EINVAL));
419 type = class_get_type(type_name);
421 CERROR("OBD: unknown type: %s\n", type_name);
422 RETURN(ERR_PTR(-ENODEV));
425 newdev = obd_device_alloc();
426 if (newdev == NULL) {
427 class_put_type(type);
428 RETURN(ERR_PTR(-ENOMEM));
430 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
431 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
432 newdev->obd_type = type;
433 newdev->obd_minor = -1;
435 rwlock_init(&newdev->obd_pool_lock);
436 newdev->obd_pool_limit = 0;
437 newdev->obd_pool_slv = 0;
439 INIT_LIST_HEAD(&newdev->obd_exports);
440 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
441 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
442 INIT_LIST_HEAD(&newdev->obd_exports_timed);
443 INIT_LIST_HEAD(&newdev->obd_nid_stats);
444 spin_lock_init(&newdev->obd_nid_lock);
445 spin_lock_init(&newdev->obd_dev_lock);
446 mutex_init(&newdev->obd_dev_mutex);
447 spin_lock_init(&newdev->obd_osfs_lock);
448 /* newdev->obd_osfs_age must be set to a value in the distant
449 * past to guarantee a fresh statfs is fetched on mount. */
450 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
452 /* XXX belongs in setup not attach */
453 init_rwsem(&newdev->obd_observer_link_sem);
455 spin_lock_init(&newdev->obd_recovery_task_lock);
456 init_waitqueue_head(&newdev->obd_next_transno_waitq);
457 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
458 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
459 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
460 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
461 INIT_LIST_HEAD(&newdev->obd_evict_list);
462 INIT_LIST_HEAD(&newdev->obd_lwp_list);
464 llog_group_init(&newdev->obd_olg);
465 /* Detach drops this */
466 atomic_set(&newdev->obd_refcount, 1);
467 lu_ref_init(&newdev->obd_reference);
468 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
470 newdev->obd_conn_inprogress = 0;
472 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
474 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
475 newdev->obd_name, newdev);
483 * \param[in] obd obd_device to be freed
487 void class_free_dev(struct obd_device *obd)
489 struct obd_type *obd_type = obd->obd_type;
491 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
492 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
493 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
494 "obd %p != obd_devs[%d] %p\n",
495 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
496 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
497 "obd_refcount should be 0, not %d\n",
498 atomic_read(&obd->obd_refcount));
499 LASSERT(obd_type != NULL);
501 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
502 obd->obd_name, obd->obd_type->typ_name);
504 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
505 obd->obd_name, obd->obd_uuid.uuid);
506 if (obd->obd_stopping) {
509 /* If we're not stopping, we were never set up */
510 err = obd_cleanup(obd);
512 CERROR("Cleanup %s returned %d\n",
516 obd_device_free(obd);
518 class_put_type(obd_type);
522 * Unregister obd device.
524 * Free slot in obd_dev[] used by \a obd.
526 * \param[in] new_obd obd_device to be unregistered
530 void class_unregister_device(struct obd_device *obd)
532 write_lock(&obd_dev_lock);
533 if (obd->obd_minor >= 0) {
534 LASSERT(obd_devs[obd->obd_minor] == obd);
535 obd_devs[obd->obd_minor] = NULL;
538 write_unlock(&obd_dev_lock);
542 * Register obd device.
544 * Find free slot in obd_devs[], fills it with \a new_obd.
546 * \param[in] new_obd obd_device to be registered
549 * \retval -EEXIST device with this name is registered
550 * \retval -EOVERFLOW obd_devs[] is full
552 int class_register_device(struct obd_device *new_obd)
556 int new_obd_minor = 0;
557 bool minor_assign = false;
558 bool retried = false;
561 write_lock(&obd_dev_lock);
562 for (i = 0; i < class_devno_max(); i++) {
563 struct obd_device *obd = class_num2obd(i);
566 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
569 write_unlock(&obd_dev_lock);
571 /* the obd_device could be waited to be
572 * destroyed by the "obd_zombie_impexp_thread".
574 obd_zombie_barrier();
579 CERROR("%s: already exists, won't add\n",
581 /* in case we found a free slot before duplicate */
582 minor_assign = false;
586 if (!minor_assign && obd == NULL) {
593 new_obd->obd_minor = new_obd_minor;
594 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
595 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
596 obd_devs[new_obd_minor] = new_obd;
600 CERROR("%s: all %u/%u devices used, increase "
601 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
602 i, class_devno_max(), ret);
605 write_unlock(&obd_dev_lock);
610 static int class_name2dev_nolock(const char *name)
617 for (i = 0; i < class_devno_max(); i++) {
618 struct obd_device *obd = class_num2obd(i);
620 if (obd && strcmp(name, obd->obd_name) == 0) {
621 /* Make sure we finished attaching before we give
622 out any references */
623 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
624 if (obd->obd_attached) {
634 int class_name2dev(const char *name)
641 read_lock(&obd_dev_lock);
642 i = class_name2dev_nolock(name);
643 read_unlock(&obd_dev_lock);
647 EXPORT_SYMBOL(class_name2dev);
649 struct obd_device *class_name2obd(const char *name)
651 int dev = class_name2dev(name);
653 if (dev < 0 || dev > class_devno_max())
655 return class_num2obd(dev);
657 EXPORT_SYMBOL(class_name2obd);
659 int class_uuid2dev_nolock(struct obd_uuid *uuid)
663 for (i = 0; i < class_devno_max(); i++) {
664 struct obd_device *obd = class_num2obd(i);
666 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
667 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
675 int class_uuid2dev(struct obd_uuid *uuid)
679 read_lock(&obd_dev_lock);
680 i = class_uuid2dev_nolock(uuid);
681 read_unlock(&obd_dev_lock);
685 EXPORT_SYMBOL(class_uuid2dev);
687 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
689 int dev = class_uuid2dev(uuid);
692 return class_num2obd(dev);
694 EXPORT_SYMBOL(class_uuid2obd);
697 * Get obd device from ::obd_devs[]
699 * \param num [in] array index
701 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
702 * otherwise return the obd device there.
704 struct obd_device *class_num2obd(int num)
706 struct obd_device *obd = NULL;
708 if (num < class_devno_max()) {
713 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
714 "%p obd_magic %08x != %08x\n",
715 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
716 LASSERTF(obd->obd_minor == num,
717 "%p obd_minor %0d != %0d\n",
718 obd, obd->obd_minor, num);
725 * Find obd in obd_dev[] by name or uuid.
727 * Increment obd's refcount if found.
729 * \param[in] str obd name or uuid
731 * \retval NULL if not found
732 * \retval target pointer to found obd_device
734 struct obd_device *class_dev_by_str(const char *str)
736 struct obd_device *target = NULL;
737 struct obd_uuid tgtuuid;
740 obd_str2uuid(&tgtuuid, str);
742 read_lock(&obd_dev_lock);
743 rc = class_uuid2dev_nolock(&tgtuuid);
745 rc = class_name2dev_nolock(str);
748 target = class_num2obd(rc);
751 class_incref(target, "find", current);
752 read_unlock(&obd_dev_lock);
756 EXPORT_SYMBOL(class_dev_by_str);
759 * Get obd devices count. Device in any
761 * \retval obd device count
763 int get_devices_count(void)
765 int index, max_index = class_devno_max(), dev_count = 0;
767 read_lock(&obd_dev_lock);
768 for (index = 0; index <= max_index; index++) {
769 struct obd_device *obd = class_num2obd(index);
773 read_unlock(&obd_dev_lock);
777 EXPORT_SYMBOL(get_devices_count);
779 void class_obd_list(void)
784 read_lock(&obd_dev_lock);
785 for (i = 0; i < class_devno_max(); i++) {
786 struct obd_device *obd = class_num2obd(i);
790 if (obd->obd_stopping)
792 else if (obd->obd_set_up)
794 else if (obd->obd_attached)
798 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
799 i, status, obd->obd_type->typ_name,
800 obd->obd_name, obd->obd_uuid.uuid,
801 atomic_read(&obd->obd_refcount));
803 read_unlock(&obd_dev_lock);
807 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
808 specified, then only the client with that uuid is returned,
809 otherwise any client connected to the tgt is returned. */
810 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
811 const char * typ_name,
812 struct obd_uuid *grp_uuid)
816 read_lock(&obd_dev_lock);
817 for (i = 0; i < class_devno_max(); i++) {
818 struct obd_device *obd = class_num2obd(i);
822 if ((strncmp(obd->obd_type->typ_name, typ_name,
823 strlen(typ_name)) == 0)) {
824 if (obd_uuid_equals(tgt_uuid,
825 &obd->u.cli.cl_target_uuid) &&
826 ((grp_uuid)? obd_uuid_equals(grp_uuid,
827 &obd->obd_uuid) : 1)) {
828 read_unlock(&obd_dev_lock);
833 read_unlock(&obd_dev_lock);
837 EXPORT_SYMBOL(class_find_client_obd);
839 /* Iterate the obd_device list looking devices have grp_uuid. Start
840 searching at *next, and if a device is found, the next index to look
841 at is saved in *next. If next is NULL, then the first matching device
842 will always be returned. */
843 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
849 else if (*next >= 0 && *next < class_devno_max())
854 read_lock(&obd_dev_lock);
855 for (; i < class_devno_max(); i++) {
856 struct obd_device *obd = class_num2obd(i);
860 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
863 read_unlock(&obd_dev_lock);
867 read_unlock(&obd_dev_lock);
871 EXPORT_SYMBOL(class_devices_in_group);
874 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
875 * adjust sptlrpc settings accordingly.
877 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
879 struct obd_device *obd;
883 LASSERT(namelen > 0);
885 read_lock(&obd_dev_lock);
886 for (i = 0; i < class_devno_max(); i++) {
887 obd = class_num2obd(i);
889 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
892 /* only notify mdc, osc, osp, lwp, mdt, ost
893 * because only these have a -sptlrpc llog */
894 type = obd->obd_type->typ_name;
895 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
896 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
897 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
898 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
899 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
900 strcmp(type, LUSTRE_OST_NAME) != 0)
903 if (strncmp(obd->obd_name, fsname, namelen))
906 class_incref(obd, __FUNCTION__, obd);
907 read_unlock(&obd_dev_lock);
908 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
909 sizeof(KEY_SPTLRPC_CONF),
910 KEY_SPTLRPC_CONF, 0, NULL, NULL);
912 class_decref(obd, __FUNCTION__, obd);
913 read_lock(&obd_dev_lock);
915 read_unlock(&obd_dev_lock);
918 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
920 void obd_cleanup_caches(void)
923 if (obd_device_cachep) {
924 kmem_cache_destroy(obd_device_cachep);
925 obd_device_cachep = NULL;
931 int obd_init_caches(void)
936 LASSERT(obd_device_cachep == NULL);
937 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
938 sizeof(struct obd_device),
940 if (!obd_device_cachep)
941 GOTO(out, rc = -ENOMEM);
945 obd_cleanup_caches();
949 /* map connection to client */
950 struct obd_export *class_conn2export(struct lustre_handle *conn)
952 struct obd_export *export;
956 CDEBUG(D_CACHE, "looking for null handle\n");
960 if (conn->cookie == -1) { /* this means assign a new connection */
961 CDEBUG(D_CACHE, "want a new connection\n");
965 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
966 export = class_handle2object(conn->cookie, NULL);
969 EXPORT_SYMBOL(class_conn2export);
971 struct obd_device *class_exp2obd(struct obd_export *exp)
977 EXPORT_SYMBOL(class_exp2obd);
979 struct obd_import *class_exp2cliimp(struct obd_export *exp)
981 struct obd_device *obd = exp->exp_obd;
984 return obd->u.cli.cl_import;
986 EXPORT_SYMBOL(class_exp2cliimp);
988 /* Export management functions */
989 static void class_export_destroy(struct obd_export *exp)
991 struct obd_device *obd = exp->exp_obd;
994 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
995 LASSERT(obd != NULL);
997 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
998 exp->exp_client_uuid.uuid, obd->obd_name);
1000 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
1001 if (exp->exp_connection)
1002 ptlrpc_put_connection_superhack(exp->exp_connection);
1004 LASSERT(list_empty(&exp->exp_outstanding_replies));
1005 LASSERT(list_empty(&exp->exp_uncommitted_replies));
1006 LASSERT(list_empty(&exp->exp_req_replay_queue));
1007 LASSERT(list_empty(&exp->exp_hp_rpcs));
1008 obd_destroy_export(exp);
1009 /* self export doesn't hold a reference to an obd, although it
1010 * exists until freeing of the obd */
1011 if (exp != obd->obd_self_export)
1012 class_decref(obd, "export", exp);
1014 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1018 static void export_handle_addref(void *export)
1020 class_export_get(export);
1023 static struct portals_handle_ops export_handle_ops = {
1024 .hop_addref = export_handle_addref,
1028 struct obd_export *class_export_get(struct obd_export *exp)
1030 atomic_inc(&exp->exp_refcount);
1031 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1032 atomic_read(&exp->exp_refcount));
1035 EXPORT_SYMBOL(class_export_get);
1037 void class_export_put(struct obd_export *exp)
1039 LASSERT(exp != NULL);
1040 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1041 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1042 atomic_read(&exp->exp_refcount) - 1);
1044 if (atomic_dec_and_test(&exp->exp_refcount)) {
1045 struct obd_device *obd = exp->exp_obd;
1047 CDEBUG(D_IOCTL, "final put %p/%s\n",
1048 exp, exp->exp_client_uuid.uuid);
1050 /* release nid stat refererence */
1051 lprocfs_exp_cleanup(exp);
1053 if (exp == obd->obd_self_export) {
1054 /* self export should be destroyed without
1055 * zombie thread as it doesn't hold a
1056 * reference to obd and doesn't hold any
1058 class_export_destroy(exp);
1059 /* self export is destroyed, no class
1060 * references exist and it is safe to free
1062 class_free_dev(obd);
1064 LASSERT(!list_empty(&exp->exp_obd_chain));
1065 obd_zombie_export_add(exp);
1070 EXPORT_SYMBOL(class_export_put);
1072 static void obd_zombie_exp_cull(struct work_struct *ws)
1074 struct obd_export *export;
1076 export = container_of(ws, struct obd_export, exp_zombie_work);
1077 class_export_destroy(export);
1080 /* Creates a new export, adds it to the hash table, and returns a
1081 * pointer to it. The refcount is 2: one for the hash reference, and
1082 * one for the pointer returned by this function. */
1083 struct obd_export *__class_new_export(struct obd_device *obd,
1084 struct obd_uuid *cluuid, bool is_self)
1086 struct obd_export *export;
1087 struct cfs_hash *hash = NULL;
1091 OBD_ALLOC_PTR(export);
1093 return ERR_PTR(-ENOMEM);
1095 export->exp_conn_cnt = 0;
1096 export->exp_lock_hash = NULL;
1097 export->exp_flock_hash = NULL;
1098 /* 2 = class_handle_hash + last */
1099 atomic_set(&export->exp_refcount, 2);
1100 atomic_set(&export->exp_rpc_count, 0);
1101 atomic_set(&export->exp_cb_count, 0);
1102 atomic_set(&export->exp_locks_count, 0);
1103 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1104 INIT_LIST_HEAD(&export->exp_locks_list);
1105 spin_lock_init(&export->exp_locks_list_guard);
1107 atomic_set(&export->exp_replay_count, 0);
1108 export->exp_obd = obd;
1109 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1110 spin_lock_init(&export->exp_uncommitted_replies_lock);
1111 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1112 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1113 INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1114 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1115 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1116 class_handle_hash(&export->exp_handle, &export_handle_ops);
1117 export->exp_last_request_time = ktime_get_real_seconds();
1118 spin_lock_init(&export->exp_lock);
1119 spin_lock_init(&export->exp_rpc_lock);
1120 INIT_HLIST_NODE(&export->exp_uuid_hash);
1121 INIT_HLIST_NODE(&export->exp_nid_hash);
1122 INIT_HLIST_NODE(&export->exp_gen_hash);
1123 spin_lock_init(&export->exp_bl_list_lock);
1124 INIT_LIST_HEAD(&export->exp_bl_list);
1125 INIT_LIST_HEAD(&export->exp_stale_list);
1126 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1128 export->exp_sp_peer = LUSTRE_SP_ANY;
1129 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1130 export->exp_client_uuid = *cluuid;
1131 obd_init_export(export);
1133 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1134 spin_lock(&obd->obd_dev_lock);
1135 /* shouldn't happen, but might race */
1136 if (obd->obd_stopping)
1137 GOTO(exit_unlock, rc = -ENODEV);
1139 hash = cfs_hash_getref(obd->obd_uuid_hash);
1141 GOTO(exit_unlock, rc = -ENODEV);
1142 spin_unlock(&obd->obd_dev_lock);
1144 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1146 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1147 obd->obd_name, cluuid->uuid, rc);
1148 GOTO(exit_err, rc = -EALREADY);
1152 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1153 spin_lock(&obd->obd_dev_lock);
1154 if (obd->obd_stopping) {
1156 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1157 GOTO(exit_unlock, rc = -ESHUTDOWN);
1161 class_incref(obd, "export", export);
1162 list_add_tail(&export->exp_obd_chain_timed,
1163 &obd->obd_exports_timed);
1164 list_add(&export->exp_obd_chain, &obd->obd_exports);
1165 obd->obd_num_exports++;
1167 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1168 INIT_LIST_HEAD(&export->exp_obd_chain);
1170 spin_unlock(&obd->obd_dev_lock);
1172 cfs_hash_putref(hash);
1176 spin_unlock(&obd->obd_dev_lock);
1179 cfs_hash_putref(hash);
1180 class_handle_unhash(&export->exp_handle);
1181 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1182 obd_destroy_export(export);
1183 OBD_FREE_PTR(export);
1187 struct obd_export *class_new_export(struct obd_device *obd,
1188 struct obd_uuid *uuid)
1190 return __class_new_export(obd, uuid, false);
1192 EXPORT_SYMBOL(class_new_export);
1194 struct obd_export *class_new_export_self(struct obd_device *obd,
1195 struct obd_uuid *uuid)
1197 return __class_new_export(obd, uuid, true);
1200 void class_unlink_export(struct obd_export *exp)
1202 class_handle_unhash(&exp->exp_handle);
1204 if (exp->exp_obd->obd_self_export == exp) {
1205 class_export_put(exp);
1209 spin_lock(&exp->exp_obd->obd_dev_lock);
1210 /* delete an uuid-export hashitem from hashtables */
1211 if (!hlist_unhashed(&exp->exp_uuid_hash))
1212 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1213 &exp->exp_client_uuid,
1214 &exp->exp_uuid_hash);
1216 #ifdef HAVE_SERVER_SUPPORT
1217 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1218 struct tg_export_data *ted = &exp->exp_target_data;
1219 struct cfs_hash *hash;
1221 /* Because obd_gen_hash will not be released until
1222 * class_cleanup(), so hash should never be NULL here */
1223 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1224 LASSERT(hash != NULL);
1225 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1226 &exp->exp_gen_hash);
1227 cfs_hash_putref(hash);
1229 #endif /* HAVE_SERVER_SUPPORT */
1231 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1232 list_del_init(&exp->exp_obd_chain_timed);
1233 exp->exp_obd->obd_num_exports--;
1234 spin_unlock(&exp->exp_obd->obd_dev_lock);
1235 atomic_inc(&obd_stale_export_num);
1237 /* A reference is kept by obd_stale_exports list */
1238 obd_stale_export_put(exp);
1240 EXPORT_SYMBOL(class_unlink_export);
1242 /* Import management functions */
1243 static void obd_zombie_import_free(struct obd_import *imp)
1247 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1248 imp->imp_obd->obd_name);
1250 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1252 ptlrpc_put_connection_superhack(imp->imp_connection);
1254 while (!list_empty(&imp->imp_conn_list)) {
1255 struct obd_import_conn *imp_conn;
1257 imp_conn = list_entry(imp->imp_conn_list.next,
1258 struct obd_import_conn, oic_item);
1259 list_del_init(&imp_conn->oic_item);
1260 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1261 OBD_FREE(imp_conn, sizeof(*imp_conn));
1264 LASSERT(imp->imp_sec == NULL);
1265 class_decref(imp->imp_obd, "import", imp);
1270 struct obd_import *class_import_get(struct obd_import *import)
1272 atomic_inc(&import->imp_refcount);
1273 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1274 atomic_read(&import->imp_refcount),
1275 import->imp_obd->obd_name);
1278 EXPORT_SYMBOL(class_import_get);
1280 void class_import_put(struct obd_import *imp)
1284 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1286 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1287 atomic_read(&imp->imp_refcount) - 1,
1288 imp->imp_obd->obd_name);
1290 if (atomic_dec_and_test(&imp->imp_refcount)) {
1291 CDEBUG(D_INFO, "final put import %p\n", imp);
1292 obd_zombie_import_add(imp);
1295 /* catch possible import put race */
1296 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1299 EXPORT_SYMBOL(class_import_put);
1301 static void init_imp_at(struct imp_at *at) {
1303 at_init(&at->iat_net_latency, 0, 0);
1304 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1305 /* max service estimates are tracked on the server side, so
1306 don't use the AT history here, just use the last reported
1307 val. (But keep hist for proc histogram, worst_ever) */
1308 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1313 static void obd_zombie_imp_cull(struct work_struct *ws)
1315 struct obd_import *import;
1317 import = container_of(ws, struct obd_import, imp_zombie_work);
1318 obd_zombie_import_free(import);
1321 struct obd_import *class_new_import(struct obd_device *obd)
1323 struct obd_import *imp;
1324 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1326 OBD_ALLOC(imp, sizeof(*imp));
1330 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1331 INIT_LIST_HEAD(&imp->imp_replay_list);
1332 INIT_LIST_HEAD(&imp->imp_sending_list);
1333 INIT_LIST_HEAD(&imp->imp_delayed_list);
1334 INIT_LIST_HEAD(&imp->imp_committed_list);
1335 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1336 imp->imp_known_replied_xid = 0;
1337 imp->imp_replay_cursor = &imp->imp_committed_list;
1338 spin_lock_init(&imp->imp_lock);
1339 imp->imp_last_success_conn = 0;
1340 imp->imp_state = LUSTRE_IMP_NEW;
1341 imp->imp_obd = class_incref(obd, "import", imp);
1342 mutex_init(&imp->imp_sec_mutex);
1343 init_waitqueue_head(&imp->imp_recovery_waitq);
1344 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1346 if (curr_pid_ns->child_reaper)
1347 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1349 imp->imp_sec_refpid = 1;
1351 atomic_set(&imp->imp_refcount, 2);
1352 atomic_set(&imp->imp_unregistering, 0);
1353 atomic_set(&imp->imp_inflight, 0);
1354 atomic_set(&imp->imp_replay_inflight, 0);
1355 atomic_set(&imp->imp_inval_count, 0);
1356 INIT_LIST_HEAD(&imp->imp_conn_list);
1357 init_imp_at(&imp->imp_at);
1359 /* the default magic is V2, will be used in connect RPC, and
1360 * then adjusted according to the flags in request/reply. */
1361 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1365 EXPORT_SYMBOL(class_new_import);
1367 void class_destroy_import(struct obd_import *import)
1369 LASSERT(import != NULL);
1370 LASSERT(import != LP_POISON);
1372 spin_lock(&import->imp_lock);
1373 import->imp_generation++;
1374 spin_unlock(&import->imp_lock);
1375 class_import_put(import);
1377 EXPORT_SYMBOL(class_destroy_import);
1379 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1381 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1383 spin_lock(&exp->exp_locks_list_guard);
1385 LASSERT(lock->l_exp_refs_nr >= 0);
1387 if (lock->l_exp_refs_target != NULL &&
1388 lock->l_exp_refs_target != exp) {
1389 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1390 exp, lock, lock->l_exp_refs_target);
1392 if ((lock->l_exp_refs_nr ++) == 0) {
1393 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1394 lock->l_exp_refs_target = exp;
1396 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1397 lock, exp, lock->l_exp_refs_nr);
1398 spin_unlock(&exp->exp_locks_list_guard);
1400 EXPORT_SYMBOL(__class_export_add_lock_ref);
1402 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1404 spin_lock(&exp->exp_locks_list_guard);
1405 LASSERT(lock->l_exp_refs_nr > 0);
1406 if (lock->l_exp_refs_target != exp) {
1407 LCONSOLE_WARN("lock %p, "
1408 "mismatching export pointers: %p, %p\n",
1409 lock, lock->l_exp_refs_target, exp);
1411 if (-- lock->l_exp_refs_nr == 0) {
1412 list_del_init(&lock->l_exp_refs_link);
1413 lock->l_exp_refs_target = NULL;
1415 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1416 lock, exp, lock->l_exp_refs_nr);
1417 spin_unlock(&exp->exp_locks_list_guard);
1419 EXPORT_SYMBOL(__class_export_del_lock_ref);
1422 /* A connection defines an export context in which preallocation can
1423 be managed. This releases the export pointer reference, and returns
1424 the export handle, so the export refcount is 1 when this function
1426 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1427 struct obd_uuid *cluuid)
1429 struct obd_export *export;
1430 LASSERT(conn != NULL);
1431 LASSERT(obd != NULL);
1432 LASSERT(cluuid != NULL);
1435 export = class_new_export(obd, cluuid);
1437 RETURN(PTR_ERR(export));
1439 conn->cookie = export->exp_handle.h_cookie;
1440 class_export_put(export);
1442 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1443 cluuid->uuid, conn->cookie);
1446 EXPORT_SYMBOL(class_connect);
1448 /* if export is involved in recovery then clean up related things */
1449 static void class_export_recovery_cleanup(struct obd_export *exp)
1451 struct obd_device *obd = exp->exp_obd;
1453 spin_lock(&obd->obd_recovery_task_lock);
1454 if (obd->obd_recovering) {
1455 if (exp->exp_in_recovery) {
1456 spin_lock(&exp->exp_lock);
1457 exp->exp_in_recovery = 0;
1458 spin_unlock(&exp->exp_lock);
1459 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1460 atomic_dec(&obd->obd_connected_clients);
1463 /* if called during recovery then should update
1464 * obd_stale_clients counter,
1465 * lightweight exports are not counted */
1466 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1467 exp->exp_obd->obd_stale_clients++;
1469 spin_unlock(&obd->obd_recovery_task_lock);
1471 spin_lock(&exp->exp_lock);
1472 /** Cleanup req replay fields */
1473 if (exp->exp_req_replay_needed) {
1474 exp->exp_req_replay_needed = 0;
1476 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1477 atomic_dec(&obd->obd_req_replay_clients);
1480 /** Cleanup lock replay data */
1481 if (exp->exp_lock_replay_needed) {
1482 exp->exp_lock_replay_needed = 0;
1484 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1485 atomic_dec(&obd->obd_lock_replay_clients);
1487 spin_unlock(&exp->exp_lock);
1490 /* This function removes 1-3 references from the export:
1491 * 1 - for export pointer passed
1492 * and if disconnect really need
1493 * 2 - removing from hash
1494 * 3 - in client_unlink_export
1495 * The export pointer passed to this function can destroyed */
1496 int class_disconnect(struct obd_export *export)
1498 int already_disconnected;
1501 if (export == NULL) {
1502 CWARN("attempting to free NULL export %p\n", export);
1506 spin_lock(&export->exp_lock);
1507 already_disconnected = export->exp_disconnected;
1508 export->exp_disconnected = 1;
1509 /* We hold references of export for uuid hash
1510 * and nid_hash and export link at least. So
1511 * it is safe to call cfs_hash_del in there. */
1512 if (!hlist_unhashed(&export->exp_nid_hash))
1513 cfs_hash_del(export->exp_obd->obd_nid_hash,
1514 &export->exp_connection->c_peer.nid,
1515 &export->exp_nid_hash);
1516 spin_unlock(&export->exp_lock);
1518 /* class_cleanup(), abort_recovery(), and class_fail_export()
1519 * all end up in here, and if any of them race we shouldn't
1520 * call extra class_export_puts(). */
1521 if (already_disconnected) {
1522 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1523 GOTO(no_disconn, already_disconnected);
1526 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1527 export->exp_handle.h_cookie);
1529 class_export_recovery_cleanup(export);
1530 class_unlink_export(export);
1532 class_export_put(export);
1535 EXPORT_SYMBOL(class_disconnect);
1537 /* Return non-zero for a fully connected export */
1538 int class_connected_export(struct obd_export *exp)
1543 spin_lock(&exp->exp_lock);
1544 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1545 spin_unlock(&exp->exp_lock);
1549 EXPORT_SYMBOL(class_connected_export);
1551 static void class_disconnect_export_list(struct list_head *list,
1552 enum obd_option flags)
1555 struct obd_export *exp;
1558 /* It's possible that an export may disconnect itself, but
1559 * nothing else will be added to this list. */
1560 while (!list_empty(list)) {
1561 exp = list_entry(list->next, struct obd_export,
1563 /* need for safe call CDEBUG after obd_disconnect */
1564 class_export_get(exp);
1566 spin_lock(&exp->exp_lock);
1567 exp->exp_flags = flags;
1568 spin_unlock(&exp->exp_lock);
1570 if (obd_uuid_equals(&exp->exp_client_uuid,
1571 &exp->exp_obd->obd_uuid)) {
1573 "exp %p export uuid == obd uuid, don't discon\n",
1575 /* Need to delete this now so we don't end up pointing
1576 * to work_list later when this export is cleaned up. */
1577 list_del_init(&exp->exp_obd_chain);
1578 class_export_put(exp);
1582 class_export_get(exp);
1583 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1584 "last request at %lld\n",
1585 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1586 exp, exp->exp_last_request_time);
1587 /* release one export reference anyway */
1588 rc = obd_disconnect(exp);
1590 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1591 obd_export_nid2str(exp), exp, rc);
1592 class_export_put(exp);
1597 void class_disconnect_exports(struct obd_device *obd)
1599 struct list_head work_list;
1602 /* Move all of the exports from obd_exports to a work list, en masse. */
1603 INIT_LIST_HEAD(&work_list);
1604 spin_lock(&obd->obd_dev_lock);
1605 list_splice_init(&obd->obd_exports, &work_list);
1606 list_splice_init(&obd->obd_delayed_exports, &work_list);
1607 spin_unlock(&obd->obd_dev_lock);
1609 if (!list_empty(&work_list)) {
1610 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1611 "disconnecting them\n", obd->obd_minor, obd);
1612 class_disconnect_export_list(&work_list,
1613 exp_flags_from_obd(obd));
1615 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1616 obd->obd_minor, obd);
1619 EXPORT_SYMBOL(class_disconnect_exports);
1621 /* Remove exports that have not completed recovery.
1623 void class_disconnect_stale_exports(struct obd_device *obd,
1624 int (*test_export)(struct obd_export *))
1626 struct list_head work_list;
1627 struct obd_export *exp, *n;
1631 INIT_LIST_HEAD(&work_list);
1632 spin_lock(&obd->obd_dev_lock);
1633 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1635 /* don't count self-export as client */
1636 if (obd_uuid_equals(&exp->exp_client_uuid,
1637 &exp->exp_obd->obd_uuid))
1640 /* don't evict clients which have no slot in last_rcvd
1641 * (e.g. lightweight connection) */
1642 if (exp->exp_target_data.ted_lr_idx == -1)
1645 spin_lock(&exp->exp_lock);
1646 if (exp->exp_failed || test_export(exp)) {
1647 spin_unlock(&exp->exp_lock);
1650 exp->exp_failed = 1;
1651 spin_unlock(&exp->exp_lock);
1653 list_move(&exp->exp_obd_chain, &work_list);
1655 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1656 obd->obd_name, exp->exp_client_uuid.uuid,
1657 obd_export_nid2str(exp));
1658 print_export_data(exp, "EVICTING", 0, D_HA);
1660 spin_unlock(&obd->obd_dev_lock);
1663 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1664 obd->obd_name, evicted);
1666 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1667 OBD_OPT_ABORT_RECOV);
1670 EXPORT_SYMBOL(class_disconnect_stale_exports);
1672 void class_fail_export(struct obd_export *exp)
1674 int rc, already_failed;
1676 spin_lock(&exp->exp_lock);
1677 already_failed = exp->exp_failed;
1678 exp->exp_failed = 1;
1679 spin_unlock(&exp->exp_lock);
1681 if (already_failed) {
1682 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1683 exp, exp->exp_client_uuid.uuid);
1687 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1688 exp, exp->exp_client_uuid.uuid);
1690 if (obd_dump_on_timeout)
1691 libcfs_debug_dumplog();
1693 /* need for safe call CDEBUG after obd_disconnect */
1694 class_export_get(exp);
1696 /* Most callers into obd_disconnect are removing their own reference
1697 * (request, for example) in addition to the one from the hash table.
1698 * We don't have such a reference here, so make one. */
1699 class_export_get(exp);
1700 rc = obd_disconnect(exp);
1702 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1704 CDEBUG(D_HA, "disconnected export %p/%s\n",
1705 exp, exp->exp_client_uuid.uuid);
1706 class_export_put(exp);
1708 EXPORT_SYMBOL(class_fail_export);
1710 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1712 struct cfs_hash *nid_hash;
1713 struct obd_export *doomed_exp = NULL;
1714 int exports_evicted = 0;
1716 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1718 spin_lock(&obd->obd_dev_lock);
1719 /* umount has run already, so evict thread should leave
1720 * its task to umount thread now */
1721 if (obd->obd_stopping) {
1722 spin_unlock(&obd->obd_dev_lock);
1723 return exports_evicted;
1725 nid_hash = obd->obd_nid_hash;
1726 cfs_hash_getref(nid_hash);
1727 spin_unlock(&obd->obd_dev_lock);
1730 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1731 if (doomed_exp == NULL)
1734 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1735 "nid %s found, wanted nid %s, requested nid %s\n",
1736 obd_export_nid2str(doomed_exp),
1737 libcfs_nid2str(nid_key), nid);
1738 LASSERTF(doomed_exp != obd->obd_self_export,
1739 "self-export is hashed by NID?\n");
1741 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1742 "request\n", obd->obd_name,
1743 obd_uuid2str(&doomed_exp->exp_client_uuid),
1744 obd_export_nid2str(doomed_exp));
1745 class_fail_export(doomed_exp);
1746 class_export_put(doomed_exp);
1749 cfs_hash_putref(nid_hash);
1751 if (!exports_evicted)
1752 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1753 obd->obd_name, nid);
1754 return exports_evicted;
1756 EXPORT_SYMBOL(obd_export_evict_by_nid);
1758 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1760 struct cfs_hash *uuid_hash;
1761 struct obd_export *doomed_exp = NULL;
1762 struct obd_uuid doomed_uuid;
1763 int exports_evicted = 0;
1765 spin_lock(&obd->obd_dev_lock);
1766 if (obd->obd_stopping) {
1767 spin_unlock(&obd->obd_dev_lock);
1768 return exports_evicted;
1770 uuid_hash = obd->obd_uuid_hash;
1771 cfs_hash_getref(uuid_hash);
1772 spin_unlock(&obd->obd_dev_lock);
1774 obd_str2uuid(&doomed_uuid, uuid);
1775 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1776 CERROR("%s: can't evict myself\n", obd->obd_name);
1777 cfs_hash_putref(uuid_hash);
1778 return exports_evicted;
1781 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1783 if (doomed_exp == NULL) {
1784 CERROR("%s: can't disconnect %s: no exports found\n",
1785 obd->obd_name, uuid);
1787 CWARN("%s: evicting %s at adminstrative request\n",
1788 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1789 class_fail_export(doomed_exp);
1790 class_export_put(doomed_exp);
1793 cfs_hash_putref(uuid_hash);
1795 return exports_evicted;
1798 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1799 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1800 EXPORT_SYMBOL(class_export_dump_hook);
1803 static void print_export_data(struct obd_export *exp, const char *status,
1804 int locks, int debug_level)
1806 struct ptlrpc_reply_state *rs;
1807 struct ptlrpc_reply_state *first_reply = NULL;
1810 spin_lock(&exp->exp_lock);
1811 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1817 spin_unlock(&exp->exp_lock);
1819 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1820 "%p %s %llu stale:%d\n",
1821 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1822 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1823 atomic_read(&exp->exp_rpc_count),
1824 atomic_read(&exp->exp_cb_count),
1825 atomic_read(&exp->exp_locks_count),
1826 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1827 nreplies, first_reply, nreplies > 3 ? "..." : "",
1828 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1829 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1830 if (locks && class_export_dump_hook != NULL)
1831 class_export_dump_hook(exp);
1835 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1837 struct obd_export *exp;
1839 spin_lock(&obd->obd_dev_lock);
1840 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1841 print_export_data(exp, "ACTIVE", locks, debug_level);
1842 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1843 print_export_data(exp, "UNLINKED", locks, debug_level);
1844 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1845 print_export_data(exp, "DELAYED", locks, debug_level);
1846 spin_unlock(&obd->obd_dev_lock);
1849 void obd_exports_barrier(struct obd_device *obd)
1852 LASSERT(list_empty(&obd->obd_exports));
1853 spin_lock(&obd->obd_dev_lock);
1854 while (!list_empty(&obd->obd_unlinked_exports)) {
1855 spin_unlock(&obd->obd_dev_lock);
1856 set_current_state(TASK_UNINTERRUPTIBLE);
1857 schedule_timeout(cfs_time_seconds(waited));
1858 if (waited > 5 && is_power_of_2(waited)) {
1859 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1860 "more than %d seconds. "
1861 "The obd refcount = %d. Is it stuck?\n",
1862 obd->obd_name, waited,
1863 atomic_read(&obd->obd_refcount));
1864 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1867 spin_lock(&obd->obd_dev_lock);
1869 spin_unlock(&obd->obd_dev_lock);
1871 EXPORT_SYMBOL(obd_exports_barrier);
1874 * Add export to the obd_zombe thread and notify it.
1876 static void obd_zombie_export_add(struct obd_export *exp) {
1877 atomic_dec(&obd_stale_export_num);
1878 spin_lock(&exp->exp_obd->obd_dev_lock);
1879 LASSERT(!list_empty(&exp->exp_obd_chain));
1880 list_del_init(&exp->exp_obd_chain);
1881 spin_unlock(&exp->exp_obd->obd_dev_lock);
1883 queue_work(zombie_wq, &exp->exp_zombie_work);
1887 * Add import to the obd_zombe thread and notify it.
1889 static void obd_zombie_import_add(struct obd_import *imp) {
1890 LASSERT(imp->imp_sec == NULL);
1892 queue_work(zombie_wq, &imp->imp_zombie_work);
1896 * wait when obd_zombie import/export queues become empty
1898 void obd_zombie_barrier(void)
1900 flush_workqueue(zombie_wq);
1902 EXPORT_SYMBOL(obd_zombie_barrier);
1905 struct obd_export *obd_stale_export_get(void)
1907 struct obd_export *exp = NULL;
1910 spin_lock(&obd_stale_export_lock);
1911 if (!list_empty(&obd_stale_exports)) {
1912 exp = list_entry(obd_stale_exports.next,
1913 struct obd_export, exp_stale_list);
1914 list_del_init(&exp->exp_stale_list);
1916 spin_unlock(&obd_stale_export_lock);
1919 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1920 atomic_read(&obd_stale_export_num));
1924 EXPORT_SYMBOL(obd_stale_export_get);
1926 void obd_stale_export_put(struct obd_export *exp)
1930 LASSERT(list_empty(&exp->exp_stale_list));
1931 if (exp->exp_lock_hash &&
1932 atomic_read(&exp->exp_lock_hash->hs_count)) {
1933 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1934 atomic_read(&obd_stale_export_num));
1936 spin_lock_bh(&exp->exp_bl_list_lock);
1937 spin_lock(&obd_stale_export_lock);
1938 /* Add to the tail if there is no blocked locks,
1939 * to the head otherwise. */
1940 if (list_empty(&exp->exp_bl_list))
1941 list_add_tail(&exp->exp_stale_list,
1942 &obd_stale_exports);
1944 list_add(&exp->exp_stale_list,
1945 &obd_stale_exports);
1947 spin_unlock(&obd_stale_export_lock);
1948 spin_unlock_bh(&exp->exp_bl_list_lock);
1950 class_export_put(exp);
1954 EXPORT_SYMBOL(obd_stale_export_put);
1957 * Adjust the position of the export in the stale list,
1958 * i.e. move to the head of the list if is needed.
1960 void obd_stale_export_adjust(struct obd_export *exp)
1962 LASSERT(exp != NULL);
1963 spin_lock_bh(&exp->exp_bl_list_lock);
1964 spin_lock(&obd_stale_export_lock);
1966 if (!list_empty(&exp->exp_stale_list) &&
1967 !list_empty(&exp->exp_bl_list))
1968 list_move(&exp->exp_stale_list, &obd_stale_exports);
1970 spin_unlock(&obd_stale_export_lock);
1971 spin_unlock_bh(&exp->exp_bl_list_lock);
1973 EXPORT_SYMBOL(obd_stale_export_adjust);
1976 * start destroy zombie import/export thread
1978 int obd_zombie_impexp_init(void)
1980 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1988 * stop destroy zombie import/export thread
1990 void obd_zombie_impexp_stop(void)
1992 destroy_workqueue(zombie_wq);
1993 LASSERT(list_empty(&obd_stale_exports));
1996 /***** Kernel-userspace comm helpers *******/
1998 /* Get length of entire message, including header */
1999 int kuc_len(int payload_len)
2001 return sizeof(struct kuc_hdr) + payload_len;
2003 EXPORT_SYMBOL(kuc_len);
2005 /* Get a pointer to kuc header, given a ptr to the payload
2006 * @param p Pointer to payload area
2007 * @returns Pointer to kuc header
2009 struct kuc_hdr * kuc_ptr(void *p)
2011 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2012 LASSERT(lh->kuc_magic == KUC_MAGIC);
2015 EXPORT_SYMBOL(kuc_ptr);
2017 /* Alloc space for a message, and fill in header
2018 * @return Pointer to payload area
2020 void *kuc_alloc(int payload_len, int transport, int type)
2023 int len = kuc_len(payload_len);
2027 return ERR_PTR(-ENOMEM);
2029 lh->kuc_magic = KUC_MAGIC;
2030 lh->kuc_transport = transport;
2031 lh->kuc_msgtype = type;
2032 lh->kuc_msglen = len;
2034 return (void *)(lh + 1);
2036 EXPORT_SYMBOL(kuc_alloc);
2038 /* Takes pointer to payload area */
2039 void kuc_free(void *p, int payload_len)
2041 struct kuc_hdr *lh = kuc_ptr(p);
2042 OBD_FREE(lh, kuc_len(payload_len));
2044 EXPORT_SYMBOL(kuc_free);
2046 struct obd_request_slot_waiter {
2047 struct list_head orsw_entry;
2048 wait_queue_head_t orsw_waitq;
2052 static bool obd_request_slot_avail(struct client_obd *cli,
2053 struct obd_request_slot_waiter *orsw)
2057 spin_lock(&cli->cl_loi_list_lock);
2058 avail = !!list_empty(&orsw->orsw_entry);
2059 spin_unlock(&cli->cl_loi_list_lock);
2065 * For network flow control, the RPC sponsor needs to acquire a credit
2066 * before sending the RPC. The credits count for a connection is defined
2067 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2068 * the subsequent RPC sponsors need to wait until others released their
2069 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2071 int obd_get_request_slot(struct client_obd *cli)
2073 struct obd_request_slot_waiter orsw;
2074 struct l_wait_info lwi;
2077 spin_lock(&cli->cl_loi_list_lock);
2078 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2079 cli->cl_rpcs_in_flight++;
2080 spin_unlock(&cli->cl_loi_list_lock);
2084 init_waitqueue_head(&orsw.orsw_waitq);
2085 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2086 orsw.orsw_signaled = false;
2087 spin_unlock(&cli->cl_loi_list_lock);
2089 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2090 rc = l_wait_event(orsw.orsw_waitq,
2091 obd_request_slot_avail(cli, &orsw) ||
2095 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2096 * freed but other (such as obd_put_request_slot) is using it. */
2097 spin_lock(&cli->cl_loi_list_lock);
2099 if (!orsw.orsw_signaled) {
2100 if (list_empty(&orsw.orsw_entry))
2101 cli->cl_rpcs_in_flight--;
2103 list_del(&orsw.orsw_entry);
2107 if (orsw.orsw_signaled) {
2108 LASSERT(list_empty(&orsw.orsw_entry));
2112 spin_unlock(&cli->cl_loi_list_lock);
2116 EXPORT_SYMBOL(obd_get_request_slot);
2118 void obd_put_request_slot(struct client_obd *cli)
2120 struct obd_request_slot_waiter *orsw;
2122 spin_lock(&cli->cl_loi_list_lock);
2123 cli->cl_rpcs_in_flight--;
2125 /* If there is free slot, wakeup the first waiter. */
2126 if (!list_empty(&cli->cl_flight_waiters) &&
2127 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2128 orsw = list_entry(cli->cl_flight_waiters.next,
2129 struct obd_request_slot_waiter, orsw_entry);
2130 list_del_init(&orsw->orsw_entry);
2131 cli->cl_rpcs_in_flight++;
2132 wake_up(&orsw->orsw_waitq);
2134 spin_unlock(&cli->cl_loi_list_lock);
2136 EXPORT_SYMBOL(obd_put_request_slot);
2138 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2140 return cli->cl_max_rpcs_in_flight;
2142 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2144 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2146 struct obd_request_slot_waiter *orsw;
2153 if (max > OBD_MAX_RIF_MAX || max < 1)
2156 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2157 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2158 /* adjust max_mod_rpcs_in_flight to ensure it is always
2159 * strictly lower that max_rpcs_in_flight */
2161 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2162 "because it must be higher than "
2163 "max_mod_rpcs_in_flight value",
2164 cli->cl_import->imp_obd->obd_name);
2167 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2168 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2174 spin_lock(&cli->cl_loi_list_lock);
2175 old = cli->cl_max_rpcs_in_flight;
2176 cli->cl_max_rpcs_in_flight = max;
2177 client_adjust_max_dirty(cli);
2181 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2182 for (i = 0; i < diff; i++) {
2183 if (list_empty(&cli->cl_flight_waiters))
2186 orsw = list_entry(cli->cl_flight_waiters.next,
2187 struct obd_request_slot_waiter, orsw_entry);
2188 list_del_init(&orsw->orsw_entry);
2189 cli->cl_rpcs_in_flight++;
2190 wake_up(&orsw->orsw_waitq);
2192 spin_unlock(&cli->cl_loi_list_lock);
2196 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2198 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2200 return cli->cl_max_mod_rpcs_in_flight;
2202 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2204 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2206 struct obd_connect_data *ocd;
2210 if (max > OBD_MAX_RIF_MAX || max < 1)
2213 /* cannot exceed or equal max_rpcs_in_flight */
2214 if (max >= cli->cl_max_rpcs_in_flight) {
2215 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2216 "higher or equal to max_rpcs_in_flight value (%u)\n",
2217 cli->cl_import->imp_obd->obd_name,
2218 max, cli->cl_max_rpcs_in_flight);
2222 /* cannot exceed max modify RPCs in flight supported by the server */
2223 ocd = &cli->cl_import->imp_connect_data;
2224 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2225 maxmodrpcs = ocd->ocd_maxmodrpcs;
2228 if (max > maxmodrpcs) {
2229 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2230 "higher than max_mod_rpcs_per_client value (%hu) "
2231 "returned by the server at connection\n",
2232 cli->cl_import->imp_obd->obd_name,
2237 spin_lock(&cli->cl_mod_rpcs_lock);
2239 prev = cli->cl_max_mod_rpcs_in_flight;
2240 cli->cl_max_mod_rpcs_in_flight = max;
2242 /* wakeup waiters if limit has been increased */
2243 if (cli->cl_max_mod_rpcs_in_flight > prev)
2244 wake_up(&cli->cl_mod_rpcs_waitq);
2246 spin_unlock(&cli->cl_mod_rpcs_lock);
2250 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2252 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2253 struct seq_file *seq)
2255 unsigned long mod_tot = 0, mod_cum;
2256 struct timespec64 now;
2259 ktime_get_real_ts64(&now);
2261 spin_lock(&cli->cl_mod_rpcs_lock);
2263 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2264 (s64)now.tv_sec, now.tv_nsec);
2265 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2266 cli->cl_mod_rpcs_in_flight);
2268 seq_printf(seq, "\n\t\t\tmodify\n");
2269 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2271 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2274 for (i = 0; i < OBD_HIST_MAX; i++) {
2275 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2277 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2278 i, mod, pct(mod, mod_tot),
2279 pct(mod_cum, mod_tot));
2280 if (mod_cum == mod_tot)
2284 spin_unlock(&cli->cl_mod_rpcs_lock);
2288 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2290 /* The number of modify RPCs sent in parallel is limited
2291 * because the server has a finite number of slots per client to
2292 * store request result and ensure reply reconstruction when needed.
2293 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2294 * that takes into account server limit and cl_max_rpcs_in_flight
2296 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2297 * one close request is allowed above the maximum.
2299 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2304 /* A slot is available if
2305 * - number of modify RPCs in flight is less than the max
2306 * - it's a close RPC and no other close request is in flight
2308 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2309 (close_req && cli->cl_close_rpcs_in_flight == 0);
2314 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2319 spin_lock(&cli->cl_mod_rpcs_lock);
2320 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2321 spin_unlock(&cli->cl_mod_rpcs_lock);
2325 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2328 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2329 it->it_op == IT_READDIR ||
2330 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2335 /* Get a modify RPC slot from the obd client @cli according
2336 * to the kind of operation @opc that is going to be sent
2337 * and the intent @it of the operation if it applies.
2338 * If the maximum number of modify RPCs in flight is reached
2339 * the thread is put to sleep.
2340 * Returns the tag to be set in the request message. Tag 0
2341 * is reserved for non-modifying requests.
2343 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2344 struct lookup_intent *it)
2346 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2347 bool close_req = false;
2350 /* read-only metadata RPCs don't consume a slot on MDT
2351 * for reply reconstruction
2353 if (obd_skip_mod_rpc_slot(it))
2356 if (opc == MDS_CLOSE)
2360 spin_lock(&cli->cl_mod_rpcs_lock);
2361 max = cli->cl_max_mod_rpcs_in_flight;
2362 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2363 /* there is a slot available */
2364 cli->cl_mod_rpcs_in_flight++;
2366 cli->cl_close_rpcs_in_flight++;
2367 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2368 cli->cl_mod_rpcs_in_flight);
2369 /* find a free tag */
2370 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2372 LASSERT(i < OBD_MAX_RIF_MAX);
2373 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2374 spin_unlock(&cli->cl_mod_rpcs_lock);
2375 /* tag 0 is reserved for non-modify RPCs */
2378 spin_unlock(&cli->cl_mod_rpcs_lock);
2380 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2381 "opc %u, max %hu\n",
2382 cli->cl_import->imp_obd->obd_name, opc, max);
2384 l_wait_event_exclusive(cli->cl_mod_rpcs_waitq,
2385 obd_mod_rpc_slot_avail(cli, close_req),
2389 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2391 /* Put a modify RPC slot from the obd client @cli according
2392 * to the kind of operation @opc that has been sent and the
2393 * intent @it of the operation if it applies.
2395 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2396 struct lookup_intent *it, __u16 tag)
2398 bool close_req = false;
2400 if (obd_skip_mod_rpc_slot(it))
2403 if (opc == MDS_CLOSE)
2406 spin_lock(&cli->cl_mod_rpcs_lock);
2407 cli->cl_mod_rpcs_in_flight--;
2409 cli->cl_close_rpcs_in_flight--;
2410 /* release the tag in the bitmap */
2411 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2412 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2413 spin_unlock(&cli->cl_mod_rpcs_lock);
2414 wake_up(&cli->cl_mod_rpcs_waitq);
2416 EXPORT_SYMBOL(obd_put_mod_rpc_slot);