4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
54 static struct kmem_cache *obd_device_cachep;
56 static struct workqueue_struct *zombie_wq;
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61 const char *status, int locks, int debug_level);
63 static LIST_HEAD(obd_stale_exports);
64 static DEFINE_SPINLOCK(obd_stale_export_lock);
65 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
101 struct list_head *tmp;
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 list_for_each(tmp, &obd_types) {
106 type = list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
152 void class_put_type(struct obd_type *type)
155 spin_lock(&type->obd_type_lock);
157 module_put(type->typ_dt_ops->o_owner);
158 spin_unlock(&type->obd_type_lock);
161 static void class_sysfs_release(struct kobject *kobj)
163 OBD_FREE(kobj, sizeof(*kobj));
166 static struct kobj_type class_ktype = {
167 .sysfs_ops = &lustre_sysfs_ops,
168 .release = class_sysfs_release,
171 struct kobject *class_setup_tunables(const char *name)
173 struct kobject *kobj;
176 #ifdef HAVE_SERVER_SUPPORT
177 kobj = kset_find_obj(lustre_kset, name);
181 OBD_ALLOC(kobj, sizeof(*kobj));
183 return ERR_PTR(-ENOMEM);
185 kobj->kset = lustre_kset;
186 kobject_init(kobj, &class_ktype);
187 rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
194 EXPORT_SYMBOL(class_setup_tunables);
196 #define CLASS_MAX_NAME 1024
198 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
199 bool enable_proc, struct lprocfs_vars *vars,
200 const char *name, struct lu_device_type *ldt)
202 struct obd_type *type;
203 #ifdef HAVE_SERVER_SUPPORT
205 #endif /* HAVE_SERVER_SUPPORT */
210 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
212 if (class_search_type(name)) {
213 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
218 OBD_ALLOC(type, sizeof(*type));
222 OBD_ALLOC_PTR(type->typ_dt_ops);
223 OBD_ALLOC_PTR(type->typ_md_ops);
224 OBD_ALLOC(type->typ_name, strlen(name) + 1);
226 if (type->typ_dt_ops == NULL ||
227 type->typ_md_ops == NULL ||
228 type->typ_name == NULL)
231 *(type->typ_dt_ops) = *dt_ops;
232 /* md_ops is optional */
234 *(type->typ_md_ops) = *md_ops;
235 strcpy(type->typ_name, name);
236 spin_lock_init(&type->obd_type_lock);
238 #ifdef CONFIG_PROC_FS
240 type->typ_procroot = lprocfs_register(type->typ_name,
243 if (IS_ERR(type->typ_procroot)) {
244 rc = PTR_ERR(type->typ_procroot);
245 type->typ_procroot = NULL;
250 #ifdef HAVE_SERVER_SUPPORT
252 dname.len = strlen(dname.name);
253 dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
255 type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
256 if (type->typ_debugfs_entry) {
257 dput(type->typ_debugfs_entry);
258 type->typ_sym_filter = true;
261 #endif /* HAVE_SERVER_SUPPORT */
263 type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
266 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
267 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
269 type->typ_debugfs_entry = NULL;
272 #ifdef HAVE_SERVER_SUPPORT
275 type->typ_kobj = class_setup_tunables(type->typ_name);
276 if (IS_ERR(type->typ_kobj))
277 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
281 rc = lu_device_type_init(ldt);
283 kobject_put(type->typ_kobj);
288 spin_lock(&obd_types_lock);
289 list_add(&type->typ_chain, &obd_types);
290 spin_unlock(&obd_types_lock);
295 #ifdef HAVE_SERVER_SUPPORT
296 if (type->typ_sym_filter)
297 type->typ_debugfs_entry = NULL;
299 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
300 ldebugfs_remove(&type->typ_debugfs_entry);
301 if (type->typ_name != NULL) {
302 #ifdef CONFIG_PROC_FS
303 if (type->typ_procroot != NULL)
304 remove_proc_subtree(type->typ_name, proc_lustre_root);
306 OBD_FREE(type->typ_name, strlen(name) + 1);
308 if (type->typ_md_ops != NULL)
309 OBD_FREE_PTR(type->typ_md_ops);
310 if (type->typ_dt_ops != NULL)
311 OBD_FREE_PTR(type->typ_dt_ops);
312 OBD_FREE(type, sizeof(*type));
315 EXPORT_SYMBOL(class_register_type);
317 int class_unregister_type(const char *name)
319 struct obd_type *type = class_search_type(name);
323 CERROR("unknown obd type\n");
327 if (type->typ_refcnt) {
328 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
329 /* This is a bad situation, let's make the best of it */
330 /* Remove ops, but leave the name for debugging */
331 OBD_FREE_PTR(type->typ_dt_ops);
332 OBD_FREE_PTR(type->typ_md_ops);
336 kobject_put(type->typ_kobj);
338 /* we do not use type->typ_procroot as for compatibility purposes
339 * other modules can share names (i.e. lod can use lov entry). so
340 * we can't reference pointer as it can get invalided when another
341 * module removes the entry */
342 #ifdef CONFIG_PROC_FS
343 if (type->typ_procroot != NULL)
344 remove_proc_subtree(type->typ_name, proc_lustre_root);
345 if (type->typ_procsym != NULL)
346 lprocfs_remove(&type->typ_procsym);
348 #ifdef HAVE_SERVER_SUPPORT
349 if (type->typ_sym_filter)
350 type->typ_debugfs_entry = NULL;
352 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
353 ldebugfs_remove(&type->typ_debugfs_entry);
356 lu_device_type_fini(type->typ_lu);
358 spin_lock(&obd_types_lock);
359 list_del(&type->typ_chain);
360 spin_unlock(&obd_types_lock);
361 OBD_FREE(type->typ_name, strlen(name) + 1);
362 if (type->typ_dt_ops != NULL)
363 OBD_FREE_PTR(type->typ_dt_ops);
364 if (type->typ_md_ops != NULL)
365 OBD_FREE_PTR(type->typ_md_ops);
366 OBD_FREE(type, sizeof(*type));
368 } /* class_unregister_type */
369 EXPORT_SYMBOL(class_unregister_type);
372 * Create a new obd device.
374 * Allocate the new obd_device and initialize it.
376 * \param[in] type_name obd device type string.
377 * \param[in] name obd device name.
378 * \param[in] uuid obd device UUID
380 * \retval newdev pointer to created obd_device
381 * \retval ERR_PTR(errno) on error
383 struct obd_device *class_newdev(const char *type_name, const char *name,
386 struct obd_device *newdev;
387 struct obd_type *type = NULL;
390 if (strlen(name) >= MAX_OBD_NAME) {
391 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
392 RETURN(ERR_PTR(-EINVAL));
395 type = class_get_type(type_name);
397 CERROR("OBD: unknown type: %s\n", type_name);
398 RETURN(ERR_PTR(-ENODEV));
401 newdev = obd_device_alloc();
402 if (newdev == NULL) {
403 class_put_type(type);
404 RETURN(ERR_PTR(-ENOMEM));
406 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
407 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
408 newdev->obd_type = type;
409 newdev->obd_minor = -1;
411 rwlock_init(&newdev->obd_pool_lock);
412 newdev->obd_pool_limit = 0;
413 newdev->obd_pool_slv = 0;
415 INIT_LIST_HEAD(&newdev->obd_exports);
416 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
417 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
418 INIT_LIST_HEAD(&newdev->obd_exports_timed);
419 INIT_LIST_HEAD(&newdev->obd_nid_stats);
420 spin_lock_init(&newdev->obd_nid_lock);
421 spin_lock_init(&newdev->obd_dev_lock);
422 mutex_init(&newdev->obd_dev_mutex);
423 spin_lock_init(&newdev->obd_osfs_lock);
424 /* newdev->obd_osfs_age must be set to a value in the distant
425 * past to guarantee a fresh statfs is fetched on mount. */
426 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
428 /* XXX belongs in setup not attach */
429 init_rwsem(&newdev->obd_observer_link_sem);
431 spin_lock_init(&newdev->obd_recovery_task_lock);
432 init_waitqueue_head(&newdev->obd_next_transno_waitq);
433 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
434 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
435 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
436 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
437 INIT_LIST_HEAD(&newdev->obd_evict_list);
438 INIT_LIST_HEAD(&newdev->obd_lwp_list);
440 llog_group_init(&newdev->obd_olg);
441 /* Detach drops this */
442 atomic_set(&newdev->obd_refcount, 1);
443 lu_ref_init(&newdev->obd_reference);
444 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
446 newdev->obd_conn_inprogress = 0;
448 strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
450 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
451 newdev->obd_name, newdev);
459 * \param[in] obd obd_device to be freed
463 void class_free_dev(struct obd_device *obd)
465 struct obd_type *obd_type = obd->obd_type;
467 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
468 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
469 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
470 "obd %p != obd_devs[%d] %p\n",
471 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
472 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
473 "obd_refcount should be 0, not %d\n",
474 atomic_read(&obd->obd_refcount));
475 LASSERT(obd_type != NULL);
477 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
478 obd->obd_name, obd->obd_type->typ_name);
480 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
481 obd->obd_name, obd->obd_uuid.uuid);
482 if (obd->obd_stopping) {
485 /* If we're not stopping, we were never set up */
486 err = obd_cleanup(obd);
488 CERROR("Cleanup %s returned %d\n",
492 obd_device_free(obd);
494 class_put_type(obd_type);
498 * Unregister obd device.
500 * Free slot in obd_dev[] used by \a obd.
502 * \param[in] new_obd obd_device to be unregistered
506 void class_unregister_device(struct obd_device *obd)
508 write_lock(&obd_dev_lock);
509 if (obd->obd_minor >= 0) {
510 LASSERT(obd_devs[obd->obd_minor] == obd);
511 obd_devs[obd->obd_minor] = NULL;
514 write_unlock(&obd_dev_lock);
518 * Register obd device.
520 * Find free slot in obd_devs[], fills it with \a new_obd.
522 * \param[in] new_obd obd_device to be registered
525 * \retval -EEXIST device with this name is registered
526 * \retval -EOVERFLOW obd_devs[] is full
528 int class_register_device(struct obd_device *new_obd)
532 int new_obd_minor = 0;
533 bool minor_assign = false;
534 bool retried = false;
537 write_lock(&obd_dev_lock);
538 for (i = 0; i < class_devno_max(); i++) {
539 struct obd_device *obd = class_num2obd(i);
542 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
545 write_unlock(&obd_dev_lock);
547 /* the obd_device could be waited to be
548 * destroyed by the "obd_zombie_impexp_thread".
550 obd_zombie_barrier();
555 CERROR("%s: already exists, won't add\n",
557 /* in case we found a free slot before duplicate */
558 minor_assign = false;
562 if (!minor_assign && obd == NULL) {
569 new_obd->obd_minor = new_obd_minor;
570 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
571 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
572 obd_devs[new_obd_minor] = new_obd;
576 CERROR("%s: all %u/%u devices used, increase "
577 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
578 i, class_devno_max(), ret);
581 write_unlock(&obd_dev_lock);
586 static int class_name2dev_nolock(const char *name)
593 for (i = 0; i < class_devno_max(); i++) {
594 struct obd_device *obd = class_num2obd(i);
596 if (obd && strcmp(name, obd->obd_name) == 0) {
597 /* Make sure we finished attaching before we give
598 out any references */
599 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
600 if (obd->obd_attached) {
610 int class_name2dev(const char *name)
617 read_lock(&obd_dev_lock);
618 i = class_name2dev_nolock(name);
619 read_unlock(&obd_dev_lock);
623 EXPORT_SYMBOL(class_name2dev);
625 struct obd_device *class_name2obd(const char *name)
627 int dev = class_name2dev(name);
629 if (dev < 0 || dev > class_devno_max())
631 return class_num2obd(dev);
633 EXPORT_SYMBOL(class_name2obd);
635 int class_uuid2dev_nolock(struct obd_uuid *uuid)
639 for (i = 0; i < class_devno_max(); i++) {
640 struct obd_device *obd = class_num2obd(i);
642 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
643 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
651 int class_uuid2dev(struct obd_uuid *uuid)
655 read_lock(&obd_dev_lock);
656 i = class_uuid2dev_nolock(uuid);
657 read_unlock(&obd_dev_lock);
661 EXPORT_SYMBOL(class_uuid2dev);
663 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
665 int dev = class_uuid2dev(uuid);
668 return class_num2obd(dev);
670 EXPORT_SYMBOL(class_uuid2obd);
673 * Get obd device from ::obd_devs[]
675 * \param num [in] array index
677 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
678 * otherwise return the obd device there.
680 struct obd_device *class_num2obd(int num)
682 struct obd_device *obd = NULL;
684 if (num < class_devno_max()) {
689 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
690 "%p obd_magic %08x != %08x\n",
691 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
692 LASSERTF(obd->obd_minor == num,
693 "%p obd_minor %0d != %0d\n",
694 obd, obd->obd_minor, num);
701 * Find obd in obd_dev[] by name or uuid.
703 * Increment obd's refcount if found.
705 * \param[in] str obd name or uuid
707 * \retval NULL if not found
708 * \retval target pointer to found obd_device
710 struct obd_device *class_dev_by_str(const char *str)
712 struct obd_device *target = NULL;
713 struct obd_uuid tgtuuid;
716 obd_str2uuid(&tgtuuid, str);
718 read_lock(&obd_dev_lock);
719 rc = class_uuid2dev_nolock(&tgtuuid);
721 rc = class_name2dev_nolock(str);
724 target = class_num2obd(rc);
727 class_incref(target, "find", current);
728 read_unlock(&obd_dev_lock);
732 EXPORT_SYMBOL(class_dev_by_str);
735 * Get obd devices count. Device in any
737 * \retval obd device count
739 int get_devices_count(void)
741 int index, max_index = class_devno_max(), dev_count = 0;
743 read_lock(&obd_dev_lock);
744 for (index = 0; index <= max_index; index++) {
745 struct obd_device *obd = class_num2obd(index);
749 read_unlock(&obd_dev_lock);
753 EXPORT_SYMBOL(get_devices_count);
755 void class_obd_list(void)
760 read_lock(&obd_dev_lock);
761 for (i = 0; i < class_devno_max(); i++) {
762 struct obd_device *obd = class_num2obd(i);
766 if (obd->obd_stopping)
768 else if (obd->obd_set_up)
770 else if (obd->obd_attached)
774 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
775 i, status, obd->obd_type->typ_name,
776 obd->obd_name, obd->obd_uuid.uuid,
777 atomic_read(&obd->obd_refcount));
779 read_unlock(&obd_dev_lock);
783 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
784 specified, then only the client with that uuid is returned,
785 otherwise any client connected to the tgt is returned. */
786 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
787 const char * typ_name,
788 struct obd_uuid *grp_uuid)
792 read_lock(&obd_dev_lock);
793 for (i = 0; i < class_devno_max(); i++) {
794 struct obd_device *obd = class_num2obd(i);
798 if ((strncmp(obd->obd_type->typ_name, typ_name,
799 strlen(typ_name)) == 0)) {
800 if (obd_uuid_equals(tgt_uuid,
801 &obd->u.cli.cl_target_uuid) &&
802 ((grp_uuid)? obd_uuid_equals(grp_uuid,
803 &obd->obd_uuid) : 1)) {
804 read_unlock(&obd_dev_lock);
809 read_unlock(&obd_dev_lock);
813 EXPORT_SYMBOL(class_find_client_obd);
815 /* Iterate the obd_device list looking devices have grp_uuid. Start
816 searching at *next, and if a device is found, the next index to look
817 at is saved in *next. If next is NULL, then the first matching device
818 will always be returned. */
819 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
825 else if (*next >= 0 && *next < class_devno_max())
830 read_lock(&obd_dev_lock);
831 for (; i < class_devno_max(); i++) {
832 struct obd_device *obd = class_num2obd(i);
836 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
839 read_unlock(&obd_dev_lock);
843 read_unlock(&obd_dev_lock);
847 EXPORT_SYMBOL(class_devices_in_group);
850 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
851 * adjust sptlrpc settings accordingly.
853 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
855 struct obd_device *obd;
859 LASSERT(namelen > 0);
861 read_lock(&obd_dev_lock);
862 for (i = 0; i < class_devno_max(); i++) {
863 obd = class_num2obd(i);
865 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
868 /* only notify mdc, osc, osp, lwp, mdt, ost
869 * because only these have a -sptlrpc llog */
870 type = obd->obd_type->typ_name;
871 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
872 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
873 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
874 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
875 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
876 strcmp(type, LUSTRE_OST_NAME) != 0)
879 if (strncmp(obd->obd_name, fsname, namelen))
882 class_incref(obd, __FUNCTION__, obd);
883 read_unlock(&obd_dev_lock);
884 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
885 sizeof(KEY_SPTLRPC_CONF),
886 KEY_SPTLRPC_CONF, 0, NULL, NULL);
888 class_decref(obd, __FUNCTION__, obd);
889 read_lock(&obd_dev_lock);
891 read_unlock(&obd_dev_lock);
894 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
896 void obd_cleanup_caches(void)
899 if (obd_device_cachep) {
900 kmem_cache_destroy(obd_device_cachep);
901 obd_device_cachep = NULL;
907 int obd_init_caches(void)
912 LASSERT(obd_device_cachep == NULL);
913 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
914 sizeof(struct obd_device),
916 if (!obd_device_cachep)
917 GOTO(out, rc = -ENOMEM);
921 obd_cleanup_caches();
925 /* map connection to client */
926 struct obd_export *class_conn2export(struct lustre_handle *conn)
928 struct obd_export *export;
932 CDEBUG(D_CACHE, "looking for null handle\n");
936 if (conn->cookie == -1) { /* this means assign a new connection */
937 CDEBUG(D_CACHE, "want a new connection\n");
941 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
942 export = class_handle2object(conn->cookie, NULL);
945 EXPORT_SYMBOL(class_conn2export);
947 struct obd_device *class_exp2obd(struct obd_export *exp)
953 EXPORT_SYMBOL(class_exp2obd);
955 struct obd_device *class_conn2obd(struct lustre_handle *conn)
957 struct obd_export *export;
958 export = class_conn2export(conn);
960 struct obd_device *obd = export->exp_obd;
961 class_export_put(export);
967 struct obd_import *class_exp2cliimp(struct obd_export *exp)
969 struct obd_device *obd = exp->exp_obd;
972 return obd->u.cli.cl_import;
974 EXPORT_SYMBOL(class_exp2cliimp);
976 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
978 struct obd_device *obd = class_conn2obd(conn);
981 return obd->u.cli.cl_import;
984 /* Export management functions */
985 static void class_export_destroy(struct obd_export *exp)
987 struct obd_device *obd = exp->exp_obd;
990 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
991 LASSERT(obd != NULL);
993 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
994 exp->exp_client_uuid.uuid, obd->obd_name);
996 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
997 if (exp->exp_connection)
998 ptlrpc_put_connection_superhack(exp->exp_connection);
1000 LASSERT(list_empty(&exp->exp_outstanding_replies));
1001 LASSERT(list_empty(&exp->exp_uncommitted_replies));
1002 LASSERT(list_empty(&exp->exp_req_replay_queue));
1003 LASSERT(list_empty(&exp->exp_hp_rpcs));
1004 obd_destroy_export(exp);
1005 /* self export doesn't hold a reference to an obd, although it
1006 * exists until freeing of the obd */
1007 if (exp != obd->obd_self_export)
1008 class_decref(obd, "export", exp);
1010 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1014 static void export_handle_addref(void *export)
1016 class_export_get(export);
1019 static struct portals_handle_ops export_handle_ops = {
1020 .hop_addref = export_handle_addref,
1024 struct obd_export *class_export_get(struct obd_export *exp)
1026 atomic_inc(&exp->exp_refcount);
1027 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1028 atomic_read(&exp->exp_refcount));
1031 EXPORT_SYMBOL(class_export_get);
1033 void class_export_put(struct obd_export *exp)
1035 LASSERT(exp != NULL);
1036 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1037 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1038 atomic_read(&exp->exp_refcount) - 1);
1040 if (atomic_dec_and_test(&exp->exp_refcount)) {
1041 struct obd_device *obd = exp->exp_obd;
1043 CDEBUG(D_IOCTL, "final put %p/%s\n",
1044 exp, exp->exp_client_uuid.uuid);
1046 /* release nid stat refererence */
1047 lprocfs_exp_cleanup(exp);
1049 if (exp == obd->obd_self_export) {
1050 /* self export should be destroyed without
1051 * zombie thread as it doesn't hold a
1052 * reference to obd and doesn't hold any
1054 class_export_destroy(exp);
1055 /* self export is destroyed, no class
1056 * references exist and it is safe to free
1058 class_free_dev(obd);
1060 LASSERT(!list_empty(&exp->exp_obd_chain));
1061 obd_zombie_export_add(exp);
1066 EXPORT_SYMBOL(class_export_put);
1068 static void obd_zombie_exp_cull(struct work_struct *ws)
1070 struct obd_export *export;
1072 export = container_of(ws, struct obd_export, exp_zombie_work);
1073 class_export_destroy(export);
1076 /* Creates a new export, adds it to the hash table, and returns a
1077 * pointer to it. The refcount is 2: one for the hash reference, and
1078 * one for the pointer returned by this function. */
1079 struct obd_export *__class_new_export(struct obd_device *obd,
1080 struct obd_uuid *cluuid, bool is_self)
1082 struct obd_export *export;
1083 struct cfs_hash *hash = NULL;
1087 OBD_ALLOC_PTR(export);
1089 return ERR_PTR(-ENOMEM);
1091 export->exp_conn_cnt = 0;
1092 export->exp_lock_hash = NULL;
1093 export->exp_flock_hash = NULL;
1094 /* 2 = class_handle_hash + last */
1095 atomic_set(&export->exp_refcount, 2);
1096 atomic_set(&export->exp_rpc_count, 0);
1097 atomic_set(&export->exp_cb_count, 0);
1098 atomic_set(&export->exp_locks_count, 0);
1099 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1100 INIT_LIST_HEAD(&export->exp_locks_list);
1101 spin_lock_init(&export->exp_locks_list_guard);
1103 atomic_set(&export->exp_replay_count, 0);
1104 export->exp_obd = obd;
1105 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1106 spin_lock_init(&export->exp_uncommitted_replies_lock);
1107 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1108 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1109 INIT_LIST_HEAD(&export->exp_handle.h_link);
1110 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1111 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1112 class_handle_hash(&export->exp_handle, &export_handle_ops);
1113 export->exp_last_request_time = ktime_get_real_seconds();
1114 spin_lock_init(&export->exp_lock);
1115 spin_lock_init(&export->exp_rpc_lock);
1116 INIT_HLIST_NODE(&export->exp_uuid_hash);
1117 INIT_HLIST_NODE(&export->exp_nid_hash);
1118 INIT_HLIST_NODE(&export->exp_gen_hash);
1119 spin_lock_init(&export->exp_bl_list_lock);
1120 INIT_LIST_HEAD(&export->exp_bl_list);
1121 INIT_LIST_HEAD(&export->exp_stale_list);
1122 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1124 export->exp_sp_peer = LUSTRE_SP_ANY;
1125 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1126 export->exp_client_uuid = *cluuid;
1127 obd_init_export(export);
1129 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1130 spin_lock(&obd->obd_dev_lock);
1131 /* shouldn't happen, but might race */
1132 if (obd->obd_stopping)
1133 GOTO(exit_unlock, rc = -ENODEV);
1135 hash = cfs_hash_getref(obd->obd_uuid_hash);
1137 GOTO(exit_unlock, rc = -ENODEV);
1138 spin_unlock(&obd->obd_dev_lock);
1140 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1142 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1143 obd->obd_name, cluuid->uuid, rc);
1144 GOTO(exit_err, rc = -EALREADY);
1148 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1149 spin_lock(&obd->obd_dev_lock);
1150 if (obd->obd_stopping) {
1152 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1153 GOTO(exit_unlock, rc = -ESHUTDOWN);
1157 class_incref(obd, "export", export);
1158 list_add_tail(&export->exp_obd_chain_timed,
1159 &obd->obd_exports_timed);
1160 list_add(&export->exp_obd_chain, &obd->obd_exports);
1161 obd->obd_num_exports++;
1163 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1164 INIT_LIST_HEAD(&export->exp_obd_chain);
1166 spin_unlock(&obd->obd_dev_lock);
1168 cfs_hash_putref(hash);
1172 spin_unlock(&obd->obd_dev_lock);
1175 cfs_hash_putref(hash);
1176 class_handle_unhash(&export->exp_handle);
1177 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1178 obd_destroy_export(export);
1179 OBD_FREE_PTR(export);
1183 struct obd_export *class_new_export(struct obd_device *obd,
1184 struct obd_uuid *uuid)
1186 return __class_new_export(obd, uuid, false);
1188 EXPORT_SYMBOL(class_new_export);
1190 struct obd_export *class_new_export_self(struct obd_device *obd,
1191 struct obd_uuid *uuid)
1193 return __class_new_export(obd, uuid, true);
1196 void class_unlink_export(struct obd_export *exp)
1198 class_handle_unhash(&exp->exp_handle);
1200 if (exp->exp_obd->obd_self_export == exp) {
1201 class_export_put(exp);
1205 spin_lock(&exp->exp_obd->obd_dev_lock);
1206 /* delete an uuid-export hashitem from hashtables */
1207 if (!hlist_unhashed(&exp->exp_uuid_hash))
1208 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1209 &exp->exp_client_uuid,
1210 &exp->exp_uuid_hash);
1212 #ifdef HAVE_SERVER_SUPPORT
1213 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1214 struct tg_export_data *ted = &exp->exp_target_data;
1215 struct cfs_hash *hash;
1217 /* Because obd_gen_hash will not be released until
1218 * class_cleanup(), so hash should never be NULL here */
1219 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1220 LASSERT(hash != NULL);
1221 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1222 &exp->exp_gen_hash);
1223 cfs_hash_putref(hash);
1225 #endif /* HAVE_SERVER_SUPPORT */
1227 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1228 list_del_init(&exp->exp_obd_chain_timed);
1229 exp->exp_obd->obd_num_exports--;
1230 spin_unlock(&exp->exp_obd->obd_dev_lock);
1231 atomic_inc(&obd_stale_export_num);
1233 /* A reference is kept by obd_stale_exports list */
1234 obd_stale_export_put(exp);
1236 EXPORT_SYMBOL(class_unlink_export);
1238 /* Import management functions */
1239 static void class_import_destroy(struct obd_import *imp)
1243 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1244 imp->imp_obd->obd_name);
1246 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1248 ptlrpc_put_connection_superhack(imp->imp_connection);
1250 while (!list_empty(&imp->imp_conn_list)) {
1251 struct obd_import_conn *imp_conn;
1253 imp_conn = list_entry(imp->imp_conn_list.next,
1254 struct obd_import_conn, oic_item);
1255 list_del_init(&imp_conn->oic_item);
1256 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1257 OBD_FREE(imp_conn, sizeof(*imp_conn));
1260 LASSERT(imp->imp_sec == NULL);
1261 class_decref(imp->imp_obd, "import", imp);
1262 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1266 static void import_handle_addref(void *import)
1268 class_import_get(import);
1271 static struct portals_handle_ops import_handle_ops = {
1272 .hop_addref = import_handle_addref,
1276 struct obd_import *class_import_get(struct obd_import *import)
1278 atomic_inc(&import->imp_refcount);
1279 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1280 atomic_read(&import->imp_refcount),
1281 import->imp_obd->obd_name);
1284 EXPORT_SYMBOL(class_import_get);
1286 void class_import_put(struct obd_import *imp)
1290 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1292 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1293 atomic_read(&imp->imp_refcount) - 1,
1294 imp->imp_obd->obd_name);
1296 if (atomic_dec_and_test(&imp->imp_refcount)) {
1297 CDEBUG(D_INFO, "final put import %p\n", imp);
1298 obd_zombie_import_add(imp);
1301 /* catch possible import put race */
1302 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1305 EXPORT_SYMBOL(class_import_put);
1307 static void init_imp_at(struct imp_at *at) {
1309 at_init(&at->iat_net_latency, 0, 0);
1310 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1311 /* max service estimates are tracked on the server side, so
1312 don't use the AT history here, just use the last reported
1313 val. (But keep hist for proc histogram, worst_ever) */
1314 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1319 static void obd_zombie_imp_cull(struct work_struct *ws)
1321 struct obd_import *import;
1323 import = container_of(ws, struct obd_import, imp_zombie_work);
1324 class_import_destroy(import);
1327 struct obd_import *class_new_import(struct obd_device *obd)
1329 struct obd_import *imp;
1330 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1332 OBD_ALLOC(imp, sizeof(*imp));
1336 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1337 INIT_LIST_HEAD(&imp->imp_replay_list);
1338 INIT_LIST_HEAD(&imp->imp_sending_list);
1339 INIT_LIST_HEAD(&imp->imp_delayed_list);
1340 INIT_LIST_HEAD(&imp->imp_committed_list);
1341 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1342 imp->imp_known_replied_xid = 0;
1343 imp->imp_replay_cursor = &imp->imp_committed_list;
1344 spin_lock_init(&imp->imp_lock);
1345 imp->imp_last_success_conn = 0;
1346 imp->imp_state = LUSTRE_IMP_NEW;
1347 imp->imp_obd = class_incref(obd, "import", imp);
1348 mutex_init(&imp->imp_sec_mutex);
1349 init_waitqueue_head(&imp->imp_recovery_waitq);
1350 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1352 if (curr_pid_ns->child_reaper)
1353 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1355 imp->imp_sec_refpid = 1;
1357 atomic_set(&imp->imp_refcount, 2);
1358 atomic_set(&imp->imp_unregistering, 0);
1359 atomic_set(&imp->imp_inflight, 0);
1360 atomic_set(&imp->imp_replay_inflight, 0);
1361 atomic_set(&imp->imp_inval_count, 0);
1362 INIT_LIST_HEAD(&imp->imp_conn_list);
1363 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1364 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1365 init_imp_at(&imp->imp_at);
1367 /* the default magic is V2, will be used in connect RPC, and
1368 * then adjusted according to the flags in request/reply. */
1369 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1373 EXPORT_SYMBOL(class_new_import);
1375 void class_destroy_import(struct obd_import *import)
1377 LASSERT(import != NULL);
1378 LASSERT(import != LP_POISON);
1380 class_handle_unhash(&import->imp_handle);
1382 spin_lock(&import->imp_lock);
1383 import->imp_generation++;
1384 spin_unlock(&import->imp_lock);
1385 class_import_put(import);
1387 EXPORT_SYMBOL(class_destroy_import);
1389 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1391 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1393 spin_lock(&exp->exp_locks_list_guard);
1395 LASSERT(lock->l_exp_refs_nr >= 0);
1397 if (lock->l_exp_refs_target != NULL &&
1398 lock->l_exp_refs_target != exp) {
1399 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1400 exp, lock, lock->l_exp_refs_target);
1402 if ((lock->l_exp_refs_nr ++) == 0) {
1403 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1404 lock->l_exp_refs_target = exp;
1406 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1407 lock, exp, lock->l_exp_refs_nr);
1408 spin_unlock(&exp->exp_locks_list_guard);
1410 EXPORT_SYMBOL(__class_export_add_lock_ref);
1412 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1414 spin_lock(&exp->exp_locks_list_guard);
1415 LASSERT(lock->l_exp_refs_nr > 0);
1416 if (lock->l_exp_refs_target != exp) {
1417 LCONSOLE_WARN("lock %p, "
1418 "mismatching export pointers: %p, %p\n",
1419 lock, lock->l_exp_refs_target, exp);
1421 if (-- lock->l_exp_refs_nr == 0) {
1422 list_del_init(&lock->l_exp_refs_link);
1423 lock->l_exp_refs_target = NULL;
1425 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1426 lock, exp, lock->l_exp_refs_nr);
1427 spin_unlock(&exp->exp_locks_list_guard);
1429 EXPORT_SYMBOL(__class_export_del_lock_ref);
1432 /* A connection defines an export context in which preallocation can
1433 be managed. This releases the export pointer reference, and returns
1434 the export handle, so the export refcount is 1 when this function
1436 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1437 struct obd_uuid *cluuid)
1439 struct obd_export *export;
1440 LASSERT(conn != NULL);
1441 LASSERT(obd != NULL);
1442 LASSERT(cluuid != NULL);
1445 export = class_new_export(obd, cluuid);
1447 RETURN(PTR_ERR(export));
1449 conn->cookie = export->exp_handle.h_cookie;
1450 class_export_put(export);
1452 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1453 cluuid->uuid, conn->cookie);
1456 EXPORT_SYMBOL(class_connect);
1458 /* if export is involved in recovery then clean up related things */
1459 static void class_export_recovery_cleanup(struct obd_export *exp)
1461 struct obd_device *obd = exp->exp_obd;
1463 spin_lock(&obd->obd_recovery_task_lock);
1464 if (obd->obd_recovering) {
1465 if (exp->exp_in_recovery) {
1466 spin_lock(&exp->exp_lock);
1467 exp->exp_in_recovery = 0;
1468 spin_unlock(&exp->exp_lock);
1469 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1470 atomic_dec(&obd->obd_connected_clients);
1473 /* if called during recovery then should update
1474 * obd_stale_clients counter,
1475 * lightweight exports are not counted */
1476 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1477 exp->exp_obd->obd_stale_clients++;
1479 spin_unlock(&obd->obd_recovery_task_lock);
1481 spin_lock(&exp->exp_lock);
1482 /** Cleanup req replay fields */
1483 if (exp->exp_req_replay_needed) {
1484 exp->exp_req_replay_needed = 0;
1486 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1487 atomic_dec(&obd->obd_req_replay_clients);
1490 /** Cleanup lock replay data */
1491 if (exp->exp_lock_replay_needed) {
1492 exp->exp_lock_replay_needed = 0;
1494 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1495 atomic_dec(&obd->obd_lock_replay_clients);
1497 spin_unlock(&exp->exp_lock);
1500 /* This function removes 1-3 references from the export:
1501 * 1 - for export pointer passed
1502 * and if disconnect really need
1503 * 2 - removing from hash
1504 * 3 - in client_unlink_export
1505 * The export pointer passed to this function can destroyed */
1506 int class_disconnect(struct obd_export *export)
1508 int already_disconnected;
1511 if (export == NULL) {
1512 CWARN("attempting to free NULL export %p\n", export);
1516 spin_lock(&export->exp_lock);
1517 already_disconnected = export->exp_disconnected;
1518 export->exp_disconnected = 1;
1519 /* We hold references of export for uuid hash
1520 * and nid_hash and export link at least. So
1521 * it is safe to call cfs_hash_del in there. */
1522 if (!hlist_unhashed(&export->exp_nid_hash))
1523 cfs_hash_del(export->exp_obd->obd_nid_hash,
1524 &export->exp_connection->c_peer.nid,
1525 &export->exp_nid_hash);
1526 spin_unlock(&export->exp_lock);
1528 /* class_cleanup(), abort_recovery(), and class_fail_export()
1529 * all end up in here, and if any of them race we shouldn't
1530 * call extra class_export_puts(). */
1531 if (already_disconnected) {
1532 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1533 GOTO(no_disconn, already_disconnected);
1536 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1537 export->exp_handle.h_cookie);
1539 class_export_recovery_cleanup(export);
1540 class_unlink_export(export);
1542 class_export_put(export);
1545 EXPORT_SYMBOL(class_disconnect);
1547 /* Return non-zero for a fully connected export */
1548 int class_connected_export(struct obd_export *exp)
1553 spin_lock(&exp->exp_lock);
1554 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1555 spin_unlock(&exp->exp_lock);
1559 EXPORT_SYMBOL(class_connected_export);
1561 static void class_disconnect_export_list(struct list_head *list,
1562 enum obd_option flags)
1565 struct obd_export *exp;
1568 /* It's possible that an export may disconnect itself, but
1569 * nothing else will be added to this list. */
1570 while (!list_empty(list)) {
1571 exp = list_entry(list->next, struct obd_export,
1573 /* need for safe call CDEBUG after obd_disconnect */
1574 class_export_get(exp);
1576 spin_lock(&exp->exp_lock);
1577 exp->exp_flags = flags;
1578 spin_unlock(&exp->exp_lock);
1580 if (obd_uuid_equals(&exp->exp_client_uuid,
1581 &exp->exp_obd->obd_uuid)) {
1583 "exp %p export uuid == obd uuid, don't discon\n",
1585 /* Need to delete this now so we don't end up pointing
1586 * to work_list later when this export is cleaned up. */
1587 list_del_init(&exp->exp_obd_chain);
1588 class_export_put(exp);
1592 class_export_get(exp);
1593 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1594 "last request at %lld\n",
1595 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1596 exp, exp->exp_last_request_time);
1597 /* release one export reference anyway */
1598 rc = obd_disconnect(exp);
1600 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1601 obd_export_nid2str(exp), exp, rc);
1602 class_export_put(exp);
1607 void class_disconnect_exports(struct obd_device *obd)
1609 struct list_head work_list;
1612 /* Move all of the exports from obd_exports to a work list, en masse. */
1613 INIT_LIST_HEAD(&work_list);
1614 spin_lock(&obd->obd_dev_lock);
1615 list_splice_init(&obd->obd_exports, &work_list);
1616 list_splice_init(&obd->obd_delayed_exports, &work_list);
1617 spin_unlock(&obd->obd_dev_lock);
1619 if (!list_empty(&work_list)) {
1620 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1621 "disconnecting them\n", obd->obd_minor, obd);
1622 class_disconnect_export_list(&work_list,
1623 exp_flags_from_obd(obd));
1625 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1626 obd->obd_minor, obd);
1629 EXPORT_SYMBOL(class_disconnect_exports);
1631 /* Remove exports that have not completed recovery.
1633 void class_disconnect_stale_exports(struct obd_device *obd,
1634 int (*test_export)(struct obd_export *))
1636 struct list_head work_list;
1637 struct obd_export *exp, *n;
1641 INIT_LIST_HEAD(&work_list);
1642 spin_lock(&obd->obd_dev_lock);
1643 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1645 /* don't count self-export as client */
1646 if (obd_uuid_equals(&exp->exp_client_uuid,
1647 &exp->exp_obd->obd_uuid))
1650 /* don't evict clients which have no slot in last_rcvd
1651 * (e.g. lightweight connection) */
1652 if (exp->exp_target_data.ted_lr_idx == -1)
1655 spin_lock(&exp->exp_lock);
1656 if (exp->exp_failed || test_export(exp)) {
1657 spin_unlock(&exp->exp_lock);
1660 exp->exp_failed = 1;
1661 spin_unlock(&exp->exp_lock);
1663 list_move(&exp->exp_obd_chain, &work_list);
1665 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1666 obd->obd_name, exp->exp_client_uuid.uuid,
1667 obd_export_nid2str(exp));
1668 print_export_data(exp, "EVICTING", 0, D_HA);
1670 spin_unlock(&obd->obd_dev_lock);
1673 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1674 obd->obd_name, evicted);
1676 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1677 OBD_OPT_ABORT_RECOV);
1680 EXPORT_SYMBOL(class_disconnect_stale_exports);
1682 void class_fail_export(struct obd_export *exp)
1684 int rc, already_failed;
1686 spin_lock(&exp->exp_lock);
1687 already_failed = exp->exp_failed;
1688 exp->exp_failed = 1;
1689 spin_unlock(&exp->exp_lock);
1691 if (already_failed) {
1692 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1693 exp, exp->exp_client_uuid.uuid);
1697 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1698 exp, exp->exp_client_uuid.uuid);
1700 if (obd_dump_on_timeout)
1701 libcfs_debug_dumplog();
1703 /* need for safe call CDEBUG after obd_disconnect */
1704 class_export_get(exp);
1706 /* Most callers into obd_disconnect are removing their own reference
1707 * (request, for example) in addition to the one from the hash table.
1708 * We don't have such a reference here, so make one. */
1709 class_export_get(exp);
1710 rc = obd_disconnect(exp);
1712 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1714 CDEBUG(D_HA, "disconnected export %p/%s\n",
1715 exp, exp->exp_client_uuid.uuid);
1716 class_export_put(exp);
1718 EXPORT_SYMBOL(class_fail_export);
1720 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1722 struct cfs_hash *nid_hash;
1723 struct obd_export *doomed_exp = NULL;
1724 int exports_evicted = 0;
1726 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1728 spin_lock(&obd->obd_dev_lock);
1729 /* umount has run already, so evict thread should leave
1730 * its task to umount thread now */
1731 if (obd->obd_stopping) {
1732 spin_unlock(&obd->obd_dev_lock);
1733 return exports_evicted;
1735 nid_hash = obd->obd_nid_hash;
1736 cfs_hash_getref(nid_hash);
1737 spin_unlock(&obd->obd_dev_lock);
1740 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1741 if (doomed_exp == NULL)
1744 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1745 "nid %s found, wanted nid %s, requested nid %s\n",
1746 obd_export_nid2str(doomed_exp),
1747 libcfs_nid2str(nid_key), nid);
1748 LASSERTF(doomed_exp != obd->obd_self_export,
1749 "self-export is hashed by NID?\n");
1751 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1752 "request\n", obd->obd_name,
1753 obd_uuid2str(&doomed_exp->exp_client_uuid),
1754 obd_export_nid2str(doomed_exp));
1755 class_fail_export(doomed_exp);
1756 class_export_put(doomed_exp);
1759 cfs_hash_putref(nid_hash);
1761 if (!exports_evicted)
1762 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1763 obd->obd_name, nid);
1764 return exports_evicted;
1766 EXPORT_SYMBOL(obd_export_evict_by_nid);
1768 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1770 struct cfs_hash *uuid_hash;
1771 struct obd_export *doomed_exp = NULL;
1772 struct obd_uuid doomed_uuid;
1773 int exports_evicted = 0;
1775 spin_lock(&obd->obd_dev_lock);
1776 if (obd->obd_stopping) {
1777 spin_unlock(&obd->obd_dev_lock);
1778 return exports_evicted;
1780 uuid_hash = obd->obd_uuid_hash;
1781 cfs_hash_getref(uuid_hash);
1782 spin_unlock(&obd->obd_dev_lock);
1784 obd_str2uuid(&doomed_uuid, uuid);
1785 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1786 CERROR("%s: can't evict myself\n", obd->obd_name);
1787 cfs_hash_putref(uuid_hash);
1788 return exports_evicted;
1791 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1793 if (doomed_exp == NULL) {
1794 CERROR("%s: can't disconnect %s: no exports found\n",
1795 obd->obd_name, uuid);
1797 CWARN("%s: evicting %s at adminstrative request\n",
1798 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1799 class_fail_export(doomed_exp);
1800 class_export_put(doomed_exp);
1803 cfs_hash_putref(uuid_hash);
1805 return exports_evicted;
1808 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1809 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1810 EXPORT_SYMBOL(class_export_dump_hook);
1813 static void print_export_data(struct obd_export *exp, const char *status,
1814 int locks, int debug_level)
1816 struct ptlrpc_reply_state *rs;
1817 struct ptlrpc_reply_state *first_reply = NULL;
1820 spin_lock(&exp->exp_lock);
1821 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1827 spin_unlock(&exp->exp_lock);
1829 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1830 "%p %s %llu stale:%d\n",
1831 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1832 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1833 atomic_read(&exp->exp_rpc_count),
1834 atomic_read(&exp->exp_cb_count),
1835 atomic_read(&exp->exp_locks_count),
1836 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1837 nreplies, first_reply, nreplies > 3 ? "..." : "",
1838 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1839 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1840 if (locks && class_export_dump_hook != NULL)
1841 class_export_dump_hook(exp);
1845 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1847 struct obd_export *exp;
1849 spin_lock(&obd->obd_dev_lock);
1850 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1851 print_export_data(exp, "ACTIVE", locks, debug_level);
1852 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1853 print_export_data(exp, "UNLINKED", locks, debug_level);
1854 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1855 print_export_data(exp, "DELAYED", locks, debug_level);
1856 spin_unlock(&obd->obd_dev_lock);
1859 void obd_exports_barrier(struct obd_device *obd)
1862 LASSERT(list_empty(&obd->obd_exports));
1863 spin_lock(&obd->obd_dev_lock);
1864 while (!list_empty(&obd->obd_unlinked_exports)) {
1865 spin_unlock(&obd->obd_dev_lock);
1866 set_current_state(TASK_UNINTERRUPTIBLE);
1867 schedule_timeout(cfs_time_seconds(waited));
1868 if (waited > 5 && is_power_of_2(waited)) {
1869 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1870 "more than %d seconds. "
1871 "The obd refcount = %d. Is it stuck?\n",
1872 obd->obd_name, waited,
1873 atomic_read(&obd->obd_refcount));
1874 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1877 spin_lock(&obd->obd_dev_lock);
1879 spin_unlock(&obd->obd_dev_lock);
1881 EXPORT_SYMBOL(obd_exports_barrier);
1884 * Add export to the obd_zombe thread and notify it.
1886 static void obd_zombie_export_add(struct obd_export *exp) {
1887 atomic_dec(&obd_stale_export_num);
1888 spin_lock(&exp->exp_obd->obd_dev_lock);
1889 LASSERT(!list_empty(&exp->exp_obd_chain));
1890 list_del_init(&exp->exp_obd_chain);
1891 spin_unlock(&exp->exp_obd->obd_dev_lock);
1893 queue_work(zombie_wq, &exp->exp_zombie_work);
1897 * Add import to the obd_zombe thread and notify it.
1899 static void obd_zombie_import_add(struct obd_import *imp) {
1900 LASSERT(imp->imp_sec == NULL);
1902 queue_work(zombie_wq, &imp->imp_zombie_work);
1906 * wait when obd_zombie import/export queues become empty
1908 void obd_zombie_barrier(void)
1910 flush_workqueue(zombie_wq);
1912 EXPORT_SYMBOL(obd_zombie_barrier);
1915 struct obd_export *obd_stale_export_get(void)
1917 struct obd_export *exp = NULL;
1920 spin_lock(&obd_stale_export_lock);
1921 if (!list_empty(&obd_stale_exports)) {
1922 exp = list_entry(obd_stale_exports.next,
1923 struct obd_export, exp_stale_list);
1924 list_del_init(&exp->exp_stale_list);
1926 spin_unlock(&obd_stale_export_lock);
1929 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1930 atomic_read(&obd_stale_export_num));
1934 EXPORT_SYMBOL(obd_stale_export_get);
1936 void obd_stale_export_put(struct obd_export *exp)
1940 LASSERT(list_empty(&exp->exp_stale_list));
1941 if (exp->exp_lock_hash &&
1942 atomic_read(&exp->exp_lock_hash->hs_count)) {
1943 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1944 atomic_read(&obd_stale_export_num));
1946 spin_lock_bh(&exp->exp_bl_list_lock);
1947 spin_lock(&obd_stale_export_lock);
1948 /* Add to the tail if there is no blocked locks,
1949 * to the head otherwise. */
1950 if (list_empty(&exp->exp_bl_list))
1951 list_add_tail(&exp->exp_stale_list,
1952 &obd_stale_exports);
1954 list_add(&exp->exp_stale_list,
1955 &obd_stale_exports);
1957 spin_unlock(&obd_stale_export_lock);
1958 spin_unlock_bh(&exp->exp_bl_list_lock);
1960 class_export_put(exp);
1964 EXPORT_SYMBOL(obd_stale_export_put);
1967 * Adjust the position of the export in the stale list,
1968 * i.e. move to the head of the list if is needed.
1970 void obd_stale_export_adjust(struct obd_export *exp)
1972 LASSERT(exp != NULL);
1973 spin_lock_bh(&exp->exp_bl_list_lock);
1974 spin_lock(&obd_stale_export_lock);
1976 if (!list_empty(&exp->exp_stale_list) &&
1977 !list_empty(&exp->exp_bl_list))
1978 list_move(&exp->exp_stale_list, &obd_stale_exports);
1980 spin_unlock(&obd_stale_export_lock);
1981 spin_unlock_bh(&exp->exp_bl_list_lock);
1983 EXPORT_SYMBOL(obd_stale_export_adjust);
1986 * start destroy zombie import/export thread
1988 int obd_zombie_impexp_init(void)
1990 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1998 * stop destroy zombie import/export thread
2000 void obd_zombie_impexp_stop(void)
2002 destroy_workqueue(zombie_wq);
2003 LASSERT(list_empty(&obd_stale_exports));
2006 /***** Kernel-userspace comm helpers *******/
2008 /* Get length of entire message, including header */
2009 int kuc_len(int payload_len)
2011 return sizeof(struct kuc_hdr) + payload_len;
2013 EXPORT_SYMBOL(kuc_len);
2015 /* Get a pointer to kuc header, given a ptr to the payload
2016 * @param p Pointer to payload area
2017 * @returns Pointer to kuc header
2019 struct kuc_hdr * kuc_ptr(void *p)
2021 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2022 LASSERT(lh->kuc_magic == KUC_MAGIC);
2025 EXPORT_SYMBOL(kuc_ptr);
2027 /* Alloc space for a message, and fill in header
2028 * @return Pointer to payload area
2030 void *kuc_alloc(int payload_len, int transport, int type)
2033 int len = kuc_len(payload_len);
2037 return ERR_PTR(-ENOMEM);
2039 lh->kuc_magic = KUC_MAGIC;
2040 lh->kuc_transport = transport;
2041 lh->kuc_msgtype = type;
2042 lh->kuc_msglen = len;
2044 return (void *)(lh + 1);
2046 EXPORT_SYMBOL(kuc_alloc);
2048 /* Takes pointer to payload area */
2049 void kuc_free(void *p, int payload_len)
2051 struct kuc_hdr *lh = kuc_ptr(p);
2052 OBD_FREE(lh, kuc_len(payload_len));
2054 EXPORT_SYMBOL(kuc_free);
2056 struct obd_request_slot_waiter {
2057 struct list_head orsw_entry;
2058 wait_queue_head_t orsw_waitq;
2062 static bool obd_request_slot_avail(struct client_obd *cli,
2063 struct obd_request_slot_waiter *orsw)
2067 spin_lock(&cli->cl_loi_list_lock);
2068 avail = !!list_empty(&orsw->orsw_entry);
2069 spin_unlock(&cli->cl_loi_list_lock);
2075 * For network flow control, the RPC sponsor needs to acquire a credit
2076 * before sending the RPC. The credits count for a connection is defined
2077 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2078 * the subsequent RPC sponsors need to wait until others released their
2079 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2081 int obd_get_request_slot(struct client_obd *cli)
2083 struct obd_request_slot_waiter orsw;
2084 struct l_wait_info lwi;
2087 spin_lock(&cli->cl_loi_list_lock);
2088 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2089 cli->cl_rpcs_in_flight++;
2090 spin_unlock(&cli->cl_loi_list_lock);
2094 init_waitqueue_head(&orsw.orsw_waitq);
2095 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2096 orsw.orsw_signaled = false;
2097 spin_unlock(&cli->cl_loi_list_lock);
2099 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2100 rc = l_wait_event(orsw.orsw_waitq,
2101 obd_request_slot_avail(cli, &orsw) ||
2105 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2106 * freed but other (such as obd_put_request_slot) is using it. */
2107 spin_lock(&cli->cl_loi_list_lock);
2109 if (!orsw.orsw_signaled) {
2110 if (list_empty(&orsw.orsw_entry))
2111 cli->cl_rpcs_in_flight--;
2113 list_del(&orsw.orsw_entry);
2117 if (orsw.orsw_signaled) {
2118 LASSERT(list_empty(&orsw.orsw_entry));
2122 spin_unlock(&cli->cl_loi_list_lock);
2126 EXPORT_SYMBOL(obd_get_request_slot);
2128 void obd_put_request_slot(struct client_obd *cli)
2130 struct obd_request_slot_waiter *orsw;
2132 spin_lock(&cli->cl_loi_list_lock);
2133 cli->cl_rpcs_in_flight--;
2135 /* If there is free slot, wakeup the first waiter. */
2136 if (!list_empty(&cli->cl_flight_waiters) &&
2137 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2138 orsw = list_entry(cli->cl_flight_waiters.next,
2139 struct obd_request_slot_waiter, orsw_entry);
2140 list_del_init(&orsw->orsw_entry);
2141 cli->cl_rpcs_in_flight++;
2142 wake_up(&orsw->orsw_waitq);
2144 spin_unlock(&cli->cl_loi_list_lock);
2146 EXPORT_SYMBOL(obd_put_request_slot);
2148 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2150 return cli->cl_max_rpcs_in_flight;
2152 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2154 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2156 struct obd_request_slot_waiter *orsw;
2163 if (max > OBD_MAX_RIF_MAX || max < 1)
2166 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2167 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2168 /* adjust max_mod_rpcs_in_flight to ensure it is always
2169 * strictly lower that max_rpcs_in_flight */
2171 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2172 "because it must be higher than "
2173 "max_mod_rpcs_in_flight value",
2174 cli->cl_import->imp_obd->obd_name);
2177 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2178 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2184 spin_lock(&cli->cl_loi_list_lock);
2185 old = cli->cl_max_rpcs_in_flight;
2186 cli->cl_max_rpcs_in_flight = max;
2187 client_adjust_max_dirty(cli);
2191 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2192 for (i = 0; i < diff; i++) {
2193 if (list_empty(&cli->cl_flight_waiters))
2196 orsw = list_entry(cli->cl_flight_waiters.next,
2197 struct obd_request_slot_waiter, orsw_entry);
2198 list_del_init(&orsw->orsw_entry);
2199 cli->cl_rpcs_in_flight++;
2200 wake_up(&orsw->orsw_waitq);
2202 spin_unlock(&cli->cl_loi_list_lock);
2206 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2208 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2210 return cli->cl_max_mod_rpcs_in_flight;
2212 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2214 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2216 struct obd_connect_data *ocd;
2220 if (max > OBD_MAX_RIF_MAX || max < 1)
2223 /* cannot exceed or equal max_rpcs_in_flight */
2224 if (max >= cli->cl_max_rpcs_in_flight) {
2225 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2226 "higher or equal to max_rpcs_in_flight value (%u)\n",
2227 cli->cl_import->imp_obd->obd_name,
2228 max, cli->cl_max_rpcs_in_flight);
2232 /* cannot exceed max modify RPCs in flight supported by the server */
2233 ocd = &cli->cl_import->imp_connect_data;
2234 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2235 maxmodrpcs = ocd->ocd_maxmodrpcs;
2238 if (max > maxmodrpcs) {
2239 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2240 "higher than max_mod_rpcs_per_client value (%hu) "
2241 "returned by the server at connection\n",
2242 cli->cl_import->imp_obd->obd_name,
2247 spin_lock(&cli->cl_mod_rpcs_lock);
2249 prev = cli->cl_max_mod_rpcs_in_flight;
2250 cli->cl_max_mod_rpcs_in_flight = max;
2252 /* wakeup waiters if limit has been increased */
2253 if (cli->cl_max_mod_rpcs_in_flight > prev)
2254 wake_up(&cli->cl_mod_rpcs_waitq);
2256 spin_unlock(&cli->cl_mod_rpcs_lock);
2260 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2263 #define pct(a, b) (b ? a * 100 / b : 0)
2264 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2265 struct seq_file *seq)
2267 unsigned long mod_tot = 0, mod_cum;
2268 struct timespec64 now;
2271 ktime_get_real_ts64(&now);
2273 spin_lock(&cli->cl_mod_rpcs_lock);
2275 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2276 (s64)now.tv_sec, now.tv_nsec);
2277 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2278 cli->cl_mod_rpcs_in_flight);
2280 seq_printf(seq, "\n\t\t\tmodify\n");
2281 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2283 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2286 for (i = 0; i < OBD_HIST_MAX; i++) {
2287 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2289 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2290 i, mod, pct(mod, mod_tot),
2291 pct(mod_cum, mod_tot));
2292 if (mod_cum == mod_tot)
2296 spin_unlock(&cli->cl_mod_rpcs_lock);
2300 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2304 /* The number of modify RPCs sent in parallel is limited
2305 * because the server has a finite number of slots per client to
2306 * store request result and ensure reply reconstruction when needed.
2307 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2308 * that takes into account server limit and cl_max_rpcs_in_flight
2310 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2311 * one close request is allowed above the maximum.
2313 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2318 /* A slot is available if
2319 * - number of modify RPCs in flight is less than the max
2320 * - it's a close RPC and no other close request is in flight
2322 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2323 (close_req && cli->cl_close_rpcs_in_flight == 0);
2328 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2333 spin_lock(&cli->cl_mod_rpcs_lock);
2334 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2335 spin_unlock(&cli->cl_mod_rpcs_lock);
2339 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2342 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2343 it->it_op == IT_READDIR ||
2344 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2349 /* Get a modify RPC slot from the obd client @cli according
2350 * to the kind of operation @opc that is going to be sent
2351 * and the intent @it of the operation if it applies.
2352 * If the maximum number of modify RPCs in flight is reached
2353 * the thread is put to sleep.
2354 * Returns the tag to be set in the request message. Tag 0
2355 * is reserved for non-modifying requests.
2357 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2358 struct lookup_intent *it)
2360 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2361 bool close_req = false;
2364 /* read-only metadata RPCs don't consume a slot on MDT
2365 * for reply reconstruction
2367 if (obd_skip_mod_rpc_slot(it))
2370 if (opc == MDS_CLOSE)
2374 spin_lock(&cli->cl_mod_rpcs_lock);
2375 max = cli->cl_max_mod_rpcs_in_flight;
2376 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2377 /* there is a slot available */
2378 cli->cl_mod_rpcs_in_flight++;
2380 cli->cl_close_rpcs_in_flight++;
2381 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2382 cli->cl_mod_rpcs_in_flight);
2383 /* find a free tag */
2384 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2386 LASSERT(i < OBD_MAX_RIF_MAX);
2387 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2388 spin_unlock(&cli->cl_mod_rpcs_lock);
2389 /* tag 0 is reserved for non-modify RPCs */
2392 spin_unlock(&cli->cl_mod_rpcs_lock);
2394 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2395 "opc %u, max %hu\n",
2396 cli->cl_import->imp_obd->obd_name, opc, max);
2398 l_wait_event(cli->cl_mod_rpcs_waitq,
2399 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2402 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2404 /* Put a modify RPC slot from the obd client @cli according
2405 * to the kind of operation @opc that has been sent and the
2406 * intent @it of the operation if it applies.
2408 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2409 struct lookup_intent *it, __u16 tag)
2411 bool close_req = false;
2413 if (obd_skip_mod_rpc_slot(it))
2416 if (opc == MDS_CLOSE)
2419 spin_lock(&cli->cl_mod_rpcs_lock);
2420 cli->cl_mod_rpcs_in_flight--;
2422 cli->cl_close_rpcs_in_flight--;
2423 /* release the tag in the bitmap */
2424 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2425 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2426 spin_unlock(&cli->cl_mod_rpcs_lock);
2427 wake_up(&cli->cl_mod_rpcs_waitq);
2429 EXPORT_SYMBOL(obd_put_mod_rpc_slot);