4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
54 static struct kmem_cache *obd_device_cachep;
56 static struct workqueue_struct *zombie_wq;
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61 const char *status, int locks, int debug_level);
63 static LIST_HEAD(obd_stale_exports);
64 static DEFINE_SPINLOCK(obd_stale_export_lock);
65 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
101 struct list_head *tmp;
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 list_for_each(tmp, &obd_types) {
106 type = list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
152 void class_put_type(struct obd_type *type)
155 spin_lock(&type->obd_type_lock);
157 module_put(type->typ_dt_ops->o_owner);
158 spin_unlock(&type->obd_type_lock);
161 static void class_sysfs_release(struct kobject *kobj)
163 OBD_FREE(kobj, sizeof(*kobj));
166 static struct kobj_type class_ktype = {
167 .sysfs_ops = &lustre_sysfs_ops,
168 .release = class_sysfs_release,
171 struct kobject *class_setup_tunables(const char *name)
173 struct kobject *kobj;
176 #ifdef HAVE_SERVER_SUPPORT
177 kobj = kset_find_obj(lustre_kset, name);
181 OBD_ALLOC(kobj, sizeof(*kobj));
183 return ERR_PTR(-ENOMEM);
185 kobj->kset = lustre_kset;
186 kobject_init(kobj, &class_ktype);
187 rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
194 EXPORT_SYMBOL(class_setup_tunables);
196 #define CLASS_MAX_NAME 1024
198 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
199 bool enable_proc, struct lprocfs_vars *vars,
200 const char *name, struct lu_device_type *ldt)
202 struct obd_type *type;
203 #ifdef HAVE_SERVER_SUPPORT
205 #endif /* HAVE_SERVER_SUPPORT */
210 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
212 if (class_search_type(name)) {
213 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
218 OBD_ALLOC(type, sizeof(*type));
222 OBD_ALLOC_PTR(type->typ_dt_ops);
223 OBD_ALLOC_PTR(type->typ_md_ops);
224 OBD_ALLOC(type->typ_name, strlen(name) + 1);
226 if (type->typ_dt_ops == NULL ||
227 type->typ_md_ops == NULL ||
228 type->typ_name == NULL)
231 *(type->typ_dt_ops) = *dt_ops;
232 /* md_ops is optional */
234 *(type->typ_md_ops) = *md_ops;
235 strcpy(type->typ_name, name);
236 spin_lock_init(&type->obd_type_lock);
238 #ifdef CONFIG_PROC_FS
240 type->typ_procroot = lprocfs_register(type->typ_name,
243 if (IS_ERR(type->typ_procroot)) {
244 rc = PTR_ERR(type->typ_procroot);
245 type->typ_procroot = NULL;
250 #ifdef HAVE_SERVER_SUPPORT
252 dname.len = strlen(dname.name);
253 dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
255 type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
256 if (type->typ_debugfs_entry) {
257 dput(type->typ_debugfs_entry);
258 type->typ_sym_filter = true;
261 #endif /* HAVE_SERVER_SUPPORT */
263 type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
266 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
267 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
269 type->typ_debugfs_entry = NULL;
272 #ifdef HAVE_SERVER_SUPPORT
275 type->typ_kobj = class_setup_tunables(type->typ_name);
276 if (IS_ERR(type->typ_kobj))
277 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
281 rc = lu_device_type_init(ldt);
283 kobject_put(type->typ_kobj);
288 spin_lock(&obd_types_lock);
289 list_add(&type->typ_chain, &obd_types);
290 spin_unlock(&obd_types_lock);
295 #ifdef HAVE_SERVER_SUPPORT
296 if (type->typ_sym_filter)
297 type->typ_debugfs_entry = NULL;
299 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
300 ldebugfs_remove(&type->typ_debugfs_entry);
301 if (type->typ_name != NULL) {
302 #ifdef CONFIG_PROC_FS
303 if (type->typ_procroot != NULL)
304 remove_proc_subtree(type->typ_name, proc_lustre_root);
306 OBD_FREE(type->typ_name, strlen(name) + 1);
308 if (type->typ_md_ops != NULL)
309 OBD_FREE_PTR(type->typ_md_ops);
310 if (type->typ_dt_ops != NULL)
311 OBD_FREE_PTR(type->typ_dt_ops);
312 OBD_FREE(type, sizeof(*type));
315 EXPORT_SYMBOL(class_register_type);
317 int class_unregister_type(const char *name)
319 struct obd_type *type = class_search_type(name);
323 CERROR("unknown obd type\n");
327 if (type->typ_refcnt) {
328 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
329 /* This is a bad situation, let's make the best of it */
330 /* Remove ops, but leave the name for debugging */
331 OBD_FREE_PTR(type->typ_dt_ops);
332 OBD_FREE_PTR(type->typ_md_ops);
336 kobject_put(type->typ_kobj);
338 /* we do not use type->typ_procroot as for compatibility purposes
339 * other modules can share names (i.e. lod can use lov entry). so
340 * we can't reference pointer as it can get invalided when another
341 * module removes the entry */
342 #ifdef CONFIG_PROC_FS
343 if (type->typ_procroot != NULL)
344 remove_proc_subtree(type->typ_name, proc_lustre_root);
345 if (type->typ_procsym != NULL)
346 lprocfs_remove(&type->typ_procsym);
348 #ifdef HAVE_SERVER_SUPPORT
349 if (type->typ_sym_filter)
350 type->typ_debugfs_entry = NULL;
352 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
353 ldebugfs_remove(&type->typ_debugfs_entry);
356 lu_device_type_fini(type->typ_lu);
358 spin_lock(&obd_types_lock);
359 list_del(&type->typ_chain);
360 spin_unlock(&obd_types_lock);
361 OBD_FREE(type->typ_name, strlen(name) + 1);
362 if (type->typ_dt_ops != NULL)
363 OBD_FREE_PTR(type->typ_dt_ops);
364 if (type->typ_md_ops != NULL)
365 OBD_FREE_PTR(type->typ_md_ops);
366 OBD_FREE(type, sizeof(*type));
368 } /* class_unregister_type */
369 EXPORT_SYMBOL(class_unregister_type);
372 * Create a new obd device.
374 * Allocate the new obd_device and initialize it.
376 * \param[in] type_name obd device type string.
377 * \param[in] name obd device name.
378 * \param[in] uuid obd device UUID
380 * \retval newdev pointer to created obd_device
381 * \retval ERR_PTR(errno) on error
383 struct obd_device *class_newdev(const char *type_name, const char *name,
386 struct obd_device *newdev;
387 struct obd_type *type = NULL;
390 if (strlen(name) >= MAX_OBD_NAME) {
391 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
392 RETURN(ERR_PTR(-EINVAL));
395 type = class_get_type(type_name);
397 CERROR("OBD: unknown type: %s\n", type_name);
398 RETURN(ERR_PTR(-ENODEV));
401 newdev = obd_device_alloc();
402 if (newdev == NULL) {
403 class_put_type(type);
404 RETURN(ERR_PTR(-ENOMEM));
406 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
407 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
408 newdev->obd_type = type;
409 newdev->obd_minor = -1;
411 rwlock_init(&newdev->obd_pool_lock);
412 newdev->obd_pool_limit = 0;
413 newdev->obd_pool_slv = 0;
415 INIT_LIST_HEAD(&newdev->obd_exports);
416 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
417 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
418 INIT_LIST_HEAD(&newdev->obd_exports_timed);
419 INIT_LIST_HEAD(&newdev->obd_nid_stats);
420 spin_lock_init(&newdev->obd_nid_lock);
421 spin_lock_init(&newdev->obd_dev_lock);
422 mutex_init(&newdev->obd_dev_mutex);
423 spin_lock_init(&newdev->obd_osfs_lock);
424 /* newdev->obd_osfs_age must be set to a value in the distant
425 * past to guarantee a fresh statfs is fetched on mount. */
426 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
428 /* XXX belongs in setup not attach */
429 init_rwsem(&newdev->obd_observer_link_sem);
431 spin_lock_init(&newdev->obd_recovery_task_lock);
432 init_waitqueue_head(&newdev->obd_next_transno_waitq);
433 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
434 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
435 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
436 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
437 INIT_LIST_HEAD(&newdev->obd_evict_list);
438 INIT_LIST_HEAD(&newdev->obd_lwp_list);
440 llog_group_init(&newdev->obd_olg);
441 /* Detach drops this */
442 atomic_set(&newdev->obd_refcount, 1);
443 lu_ref_init(&newdev->obd_reference);
444 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
446 newdev->obd_conn_inprogress = 0;
448 strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
450 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
451 newdev->obd_name, newdev);
459 * \param[in] obd obd_device to be freed
463 void class_free_dev(struct obd_device *obd)
465 struct obd_type *obd_type = obd->obd_type;
467 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
468 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
469 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
470 "obd %p != obd_devs[%d] %p\n",
471 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
472 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
473 "obd_refcount should be 0, not %d\n",
474 atomic_read(&obd->obd_refcount));
475 LASSERT(obd_type != NULL);
477 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
478 obd->obd_name, obd->obd_type->typ_name);
480 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
481 obd->obd_name, obd->obd_uuid.uuid);
482 if (obd->obd_stopping) {
485 /* If we're not stopping, we were never set up */
486 err = obd_cleanup(obd);
488 CERROR("Cleanup %s returned %d\n",
492 obd_device_free(obd);
494 class_put_type(obd_type);
498 * Unregister obd device.
500 * Free slot in obd_dev[] used by \a obd.
502 * \param[in] new_obd obd_device to be unregistered
506 void class_unregister_device(struct obd_device *obd)
508 write_lock(&obd_dev_lock);
509 if (obd->obd_minor >= 0) {
510 LASSERT(obd_devs[obd->obd_minor] == obd);
511 obd_devs[obd->obd_minor] = NULL;
514 write_unlock(&obd_dev_lock);
518 * Register obd device.
520 * Find free slot in obd_devs[], fills it with \a new_obd.
522 * \param[in] new_obd obd_device to be registered
525 * \retval -EEXIST device with this name is registered
526 * \retval -EOVERFLOW obd_devs[] is full
528 int class_register_device(struct obd_device *new_obd)
532 int new_obd_minor = 0;
533 bool minor_assign = false;
534 bool retried = false;
537 write_lock(&obd_dev_lock);
538 for (i = 0; i < class_devno_max(); i++) {
539 struct obd_device *obd = class_num2obd(i);
542 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
545 write_unlock(&obd_dev_lock);
547 /* the obd_device could be waited to be
548 * destroyed by the "obd_zombie_impexp_thread".
550 obd_zombie_barrier();
555 CERROR("%s: already exists, won't add\n",
557 /* in case we found a free slot before duplicate */
558 minor_assign = false;
562 if (!minor_assign && obd == NULL) {
569 new_obd->obd_minor = new_obd_minor;
570 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
571 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
572 obd_devs[new_obd_minor] = new_obd;
576 CERROR("%s: all %u/%u devices used, increase "
577 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
578 i, class_devno_max(), ret);
581 write_unlock(&obd_dev_lock);
586 static int class_name2dev_nolock(const char *name)
593 for (i = 0; i < class_devno_max(); i++) {
594 struct obd_device *obd = class_num2obd(i);
596 if (obd && strcmp(name, obd->obd_name) == 0) {
597 /* Make sure we finished attaching before we give
598 out any references */
599 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
600 if (obd->obd_attached) {
610 int class_name2dev(const char *name)
617 read_lock(&obd_dev_lock);
618 i = class_name2dev_nolock(name);
619 read_unlock(&obd_dev_lock);
623 EXPORT_SYMBOL(class_name2dev);
625 struct obd_device *class_name2obd(const char *name)
627 int dev = class_name2dev(name);
629 if (dev < 0 || dev > class_devno_max())
631 return class_num2obd(dev);
633 EXPORT_SYMBOL(class_name2obd);
635 int class_uuid2dev_nolock(struct obd_uuid *uuid)
639 for (i = 0; i < class_devno_max(); i++) {
640 struct obd_device *obd = class_num2obd(i);
642 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
643 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
651 int class_uuid2dev(struct obd_uuid *uuid)
655 read_lock(&obd_dev_lock);
656 i = class_uuid2dev_nolock(uuid);
657 read_unlock(&obd_dev_lock);
661 EXPORT_SYMBOL(class_uuid2dev);
663 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
665 int dev = class_uuid2dev(uuid);
668 return class_num2obd(dev);
670 EXPORT_SYMBOL(class_uuid2obd);
673 * Get obd device from ::obd_devs[]
675 * \param num [in] array index
677 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
678 * otherwise return the obd device there.
680 struct obd_device *class_num2obd(int num)
682 struct obd_device *obd = NULL;
684 if (num < class_devno_max()) {
689 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
690 "%p obd_magic %08x != %08x\n",
691 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
692 LASSERTF(obd->obd_minor == num,
693 "%p obd_minor %0d != %0d\n",
694 obd, obd->obd_minor, num);
701 * Find obd in obd_dev[] by name or uuid.
703 * Increment obd's refcount if found.
705 * \param[in] str obd name or uuid
707 * \retval NULL if not found
708 * \retval target pointer to found obd_device
710 struct obd_device *class_dev_by_str(const char *str)
712 struct obd_device *target = NULL;
713 struct obd_uuid tgtuuid;
716 obd_str2uuid(&tgtuuid, str);
718 read_lock(&obd_dev_lock);
719 rc = class_uuid2dev_nolock(&tgtuuid);
721 rc = class_name2dev_nolock(str);
724 target = class_num2obd(rc);
727 class_incref(target, "find", current);
728 read_unlock(&obd_dev_lock);
732 EXPORT_SYMBOL(class_dev_by_str);
735 * Get obd devices count. Device in any
737 * \retval obd device count
739 int get_devices_count(void)
741 int index, max_index = class_devno_max(), dev_count = 0;
743 read_lock(&obd_dev_lock);
744 for (index = 0; index <= max_index; index++) {
745 struct obd_device *obd = class_num2obd(index);
749 read_unlock(&obd_dev_lock);
753 EXPORT_SYMBOL(get_devices_count);
755 void class_obd_list(void)
760 read_lock(&obd_dev_lock);
761 for (i = 0; i < class_devno_max(); i++) {
762 struct obd_device *obd = class_num2obd(i);
766 if (obd->obd_stopping)
768 else if (obd->obd_set_up)
770 else if (obd->obd_attached)
774 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
775 i, status, obd->obd_type->typ_name,
776 obd->obd_name, obd->obd_uuid.uuid,
777 atomic_read(&obd->obd_refcount));
779 read_unlock(&obd_dev_lock);
783 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
784 specified, then only the client with that uuid is returned,
785 otherwise any client connected to the tgt is returned. */
786 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
787 const char * typ_name,
788 struct obd_uuid *grp_uuid)
792 read_lock(&obd_dev_lock);
793 for (i = 0; i < class_devno_max(); i++) {
794 struct obd_device *obd = class_num2obd(i);
798 if ((strncmp(obd->obd_type->typ_name, typ_name,
799 strlen(typ_name)) == 0)) {
800 if (obd_uuid_equals(tgt_uuid,
801 &obd->u.cli.cl_target_uuid) &&
802 ((grp_uuid)? obd_uuid_equals(grp_uuid,
803 &obd->obd_uuid) : 1)) {
804 read_unlock(&obd_dev_lock);
809 read_unlock(&obd_dev_lock);
813 EXPORT_SYMBOL(class_find_client_obd);
815 /* Iterate the obd_device list looking devices have grp_uuid. Start
816 searching at *next, and if a device is found, the next index to look
817 at is saved in *next. If next is NULL, then the first matching device
818 will always be returned. */
819 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
825 else if (*next >= 0 && *next < class_devno_max())
830 read_lock(&obd_dev_lock);
831 for (; i < class_devno_max(); i++) {
832 struct obd_device *obd = class_num2obd(i);
836 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
839 read_unlock(&obd_dev_lock);
843 read_unlock(&obd_dev_lock);
847 EXPORT_SYMBOL(class_devices_in_group);
850 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
851 * adjust sptlrpc settings accordingly.
853 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
855 struct obd_device *obd;
859 LASSERT(namelen > 0);
861 read_lock(&obd_dev_lock);
862 for (i = 0; i < class_devno_max(); i++) {
863 obd = class_num2obd(i);
865 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
868 /* only notify mdc, osc, osp, lwp, mdt, ost
869 * because only these have a -sptlrpc llog */
870 type = obd->obd_type->typ_name;
871 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
872 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
873 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
874 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
875 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
876 strcmp(type, LUSTRE_OST_NAME) != 0)
879 if (strncmp(obd->obd_name, fsname, namelen))
882 class_incref(obd, __FUNCTION__, obd);
883 read_unlock(&obd_dev_lock);
884 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
885 sizeof(KEY_SPTLRPC_CONF),
886 KEY_SPTLRPC_CONF, 0, NULL, NULL);
888 class_decref(obd, __FUNCTION__, obd);
889 read_lock(&obd_dev_lock);
891 read_unlock(&obd_dev_lock);
894 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
896 void obd_cleanup_caches(void)
899 if (obd_device_cachep) {
900 kmem_cache_destroy(obd_device_cachep);
901 obd_device_cachep = NULL;
907 int obd_init_caches(void)
912 LASSERT(obd_device_cachep == NULL);
913 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
914 sizeof(struct obd_device),
916 if (!obd_device_cachep)
917 GOTO(out, rc = -ENOMEM);
921 obd_cleanup_caches();
925 /* map connection to client */
926 struct obd_export *class_conn2export(struct lustre_handle *conn)
928 struct obd_export *export;
932 CDEBUG(D_CACHE, "looking for null handle\n");
936 if (conn->cookie == -1) { /* this means assign a new connection */
937 CDEBUG(D_CACHE, "want a new connection\n");
941 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
942 export = class_handle2object(conn->cookie, NULL);
945 EXPORT_SYMBOL(class_conn2export);
947 struct obd_device *class_exp2obd(struct obd_export *exp)
953 EXPORT_SYMBOL(class_exp2obd);
955 struct obd_import *class_exp2cliimp(struct obd_export *exp)
957 struct obd_device *obd = exp->exp_obd;
960 return obd->u.cli.cl_import;
962 EXPORT_SYMBOL(class_exp2cliimp);
964 /* Export management functions */
965 static void class_export_destroy(struct obd_export *exp)
967 struct obd_device *obd = exp->exp_obd;
970 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
971 LASSERT(obd != NULL);
973 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
974 exp->exp_client_uuid.uuid, obd->obd_name);
976 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
977 if (exp->exp_connection)
978 ptlrpc_put_connection_superhack(exp->exp_connection);
980 LASSERT(list_empty(&exp->exp_outstanding_replies));
981 LASSERT(list_empty(&exp->exp_uncommitted_replies));
982 LASSERT(list_empty(&exp->exp_req_replay_queue));
983 LASSERT(list_empty(&exp->exp_hp_rpcs));
984 obd_destroy_export(exp);
985 /* self export doesn't hold a reference to an obd, although it
986 * exists until freeing of the obd */
987 if (exp != obd->obd_self_export)
988 class_decref(obd, "export", exp);
990 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
994 static void export_handle_addref(void *export)
996 class_export_get(export);
999 static struct portals_handle_ops export_handle_ops = {
1000 .hop_addref = export_handle_addref,
1004 struct obd_export *class_export_get(struct obd_export *exp)
1006 atomic_inc(&exp->exp_refcount);
1007 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1008 atomic_read(&exp->exp_refcount));
1011 EXPORT_SYMBOL(class_export_get);
1013 void class_export_put(struct obd_export *exp)
1015 LASSERT(exp != NULL);
1016 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1017 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1018 atomic_read(&exp->exp_refcount) - 1);
1020 if (atomic_dec_and_test(&exp->exp_refcount)) {
1021 struct obd_device *obd = exp->exp_obd;
1023 CDEBUG(D_IOCTL, "final put %p/%s\n",
1024 exp, exp->exp_client_uuid.uuid);
1026 /* release nid stat refererence */
1027 lprocfs_exp_cleanup(exp);
1029 if (exp == obd->obd_self_export) {
1030 /* self export should be destroyed without
1031 * zombie thread as it doesn't hold a
1032 * reference to obd and doesn't hold any
1034 class_export_destroy(exp);
1035 /* self export is destroyed, no class
1036 * references exist and it is safe to free
1038 class_free_dev(obd);
1040 LASSERT(!list_empty(&exp->exp_obd_chain));
1041 obd_zombie_export_add(exp);
1046 EXPORT_SYMBOL(class_export_put);
1048 static void obd_zombie_exp_cull(struct work_struct *ws)
1050 struct obd_export *export;
1052 export = container_of(ws, struct obd_export, exp_zombie_work);
1053 class_export_destroy(export);
1056 /* Creates a new export, adds it to the hash table, and returns a
1057 * pointer to it. The refcount is 2: one for the hash reference, and
1058 * one for the pointer returned by this function. */
1059 struct obd_export *__class_new_export(struct obd_device *obd,
1060 struct obd_uuid *cluuid, bool is_self)
1062 struct obd_export *export;
1063 struct cfs_hash *hash = NULL;
1067 OBD_ALLOC_PTR(export);
1069 return ERR_PTR(-ENOMEM);
1071 export->exp_conn_cnt = 0;
1072 export->exp_lock_hash = NULL;
1073 export->exp_flock_hash = NULL;
1074 /* 2 = class_handle_hash + last */
1075 atomic_set(&export->exp_refcount, 2);
1076 atomic_set(&export->exp_rpc_count, 0);
1077 atomic_set(&export->exp_cb_count, 0);
1078 atomic_set(&export->exp_locks_count, 0);
1079 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1080 INIT_LIST_HEAD(&export->exp_locks_list);
1081 spin_lock_init(&export->exp_locks_list_guard);
1083 atomic_set(&export->exp_replay_count, 0);
1084 export->exp_obd = obd;
1085 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1086 spin_lock_init(&export->exp_uncommitted_replies_lock);
1087 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1088 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1089 INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1090 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1091 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1092 class_handle_hash(&export->exp_handle, &export_handle_ops);
1093 export->exp_last_request_time = ktime_get_real_seconds();
1094 spin_lock_init(&export->exp_lock);
1095 spin_lock_init(&export->exp_rpc_lock);
1096 INIT_HLIST_NODE(&export->exp_uuid_hash);
1097 INIT_HLIST_NODE(&export->exp_nid_hash);
1098 INIT_HLIST_NODE(&export->exp_gen_hash);
1099 spin_lock_init(&export->exp_bl_list_lock);
1100 INIT_LIST_HEAD(&export->exp_bl_list);
1101 INIT_LIST_HEAD(&export->exp_stale_list);
1102 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1104 export->exp_sp_peer = LUSTRE_SP_ANY;
1105 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1106 export->exp_client_uuid = *cluuid;
1107 obd_init_export(export);
1109 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1110 spin_lock(&obd->obd_dev_lock);
1111 /* shouldn't happen, but might race */
1112 if (obd->obd_stopping)
1113 GOTO(exit_unlock, rc = -ENODEV);
1115 hash = cfs_hash_getref(obd->obd_uuid_hash);
1117 GOTO(exit_unlock, rc = -ENODEV);
1118 spin_unlock(&obd->obd_dev_lock);
1120 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1122 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1123 obd->obd_name, cluuid->uuid, rc);
1124 GOTO(exit_err, rc = -EALREADY);
1128 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1129 spin_lock(&obd->obd_dev_lock);
1130 if (obd->obd_stopping) {
1132 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1133 GOTO(exit_unlock, rc = -ESHUTDOWN);
1137 class_incref(obd, "export", export);
1138 list_add_tail(&export->exp_obd_chain_timed,
1139 &obd->obd_exports_timed);
1140 list_add(&export->exp_obd_chain, &obd->obd_exports);
1141 obd->obd_num_exports++;
1143 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1144 INIT_LIST_HEAD(&export->exp_obd_chain);
1146 spin_unlock(&obd->obd_dev_lock);
1148 cfs_hash_putref(hash);
1152 spin_unlock(&obd->obd_dev_lock);
1155 cfs_hash_putref(hash);
1156 class_handle_unhash(&export->exp_handle);
1157 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1158 obd_destroy_export(export);
1159 OBD_FREE_PTR(export);
1163 struct obd_export *class_new_export(struct obd_device *obd,
1164 struct obd_uuid *uuid)
1166 return __class_new_export(obd, uuid, false);
1168 EXPORT_SYMBOL(class_new_export);
1170 struct obd_export *class_new_export_self(struct obd_device *obd,
1171 struct obd_uuid *uuid)
1173 return __class_new_export(obd, uuid, true);
1176 void class_unlink_export(struct obd_export *exp)
1178 class_handle_unhash(&exp->exp_handle);
1180 if (exp->exp_obd->obd_self_export == exp) {
1181 class_export_put(exp);
1185 spin_lock(&exp->exp_obd->obd_dev_lock);
1186 /* delete an uuid-export hashitem from hashtables */
1187 if (!hlist_unhashed(&exp->exp_uuid_hash))
1188 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1189 &exp->exp_client_uuid,
1190 &exp->exp_uuid_hash);
1192 #ifdef HAVE_SERVER_SUPPORT
1193 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1194 struct tg_export_data *ted = &exp->exp_target_data;
1195 struct cfs_hash *hash;
1197 /* Because obd_gen_hash will not be released until
1198 * class_cleanup(), so hash should never be NULL here */
1199 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1200 LASSERT(hash != NULL);
1201 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1202 &exp->exp_gen_hash);
1203 cfs_hash_putref(hash);
1205 #endif /* HAVE_SERVER_SUPPORT */
1207 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1208 list_del_init(&exp->exp_obd_chain_timed);
1209 exp->exp_obd->obd_num_exports--;
1210 spin_unlock(&exp->exp_obd->obd_dev_lock);
1211 atomic_inc(&obd_stale_export_num);
1213 /* A reference is kept by obd_stale_exports list */
1214 obd_stale_export_put(exp);
1216 EXPORT_SYMBOL(class_unlink_export);
1218 /* Import management functions */
1219 static void obd_zombie_import_free(struct obd_import *imp)
1223 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1224 imp->imp_obd->obd_name);
1226 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1228 ptlrpc_put_connection_superhack(imp->imp_connection);
1230 while (!list_empty(&imp->imp_conn_list)) {
1231 struct obd_import_conn *imp_conn;
1233 imp_conn = list_entry(imp->imp_conn_list.next,
1234 struct obd_import_conn, oic_item);
1235 list_del_init(&imp_conn->oic_item);
1236 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1237 OBD_FREE(imp_conn, sizeof(*imp_conn));
1240 LASSERT(imp->imp_sec == NULL);
1241 class_decref(imp->imp_obd, "import", imp);
1246 struct obd_import *class_import_get(struct obd_import *import)
1248 atomic_inc(&import->imp_refcount);
1249 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1250 atomic_read(&import->imp_refcount),
1251 import->imp_obd->obd_name);
1254 EXPORT_SYMBOL(class_import_get);
1256 void class_import_put(struct obd_import *imp)
1260 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1262 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1263 atomic_read(&imp->imp_refcount) - 1,
1264 imp->imp_obd->obd_name);
1266 if (atomic_dec_and_test(&imp->imp_refcount)) {
1267 CDEBUG(D_INFO, "final put import %p\n", imp);
1268 obd_zombie_import_add(imp);
1271 /* catch possible import put race */
1272 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1275 EXPORT_SYMBOL(class_import_put);
1277 static void init_imp_at(struct imp_at *at) {
1279 at_init(&at->iat_net_latency, 0, 0);
1280 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1281 /* max service estimates are tracked on the server side, so
1282 don't use the AT history here, just use the last reported
1283 val. (But keep hist for proc histogram, worst_ever) */
1284 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1289 static void obd_zombie_imp_cull(struct work_struct *ws)
1291 struct obd_import *import;
1293 import = container_of(ws, struct obd_import, imp_zombie_work);
1294 obd_zombie_import_free(import);
1297 struct obd_import *class_new_import(struct obd_device *obd)
1299 struct obd_import *imp;
1300 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1302 OBD_ALLOC(imp, sizeof(*imp));
1306 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1307 INIT_LIST_HEAD(&imp->imp_replay_list);
1308 INIT_LIST_HEAD(&imp->imp_sending_list);
1309 INIT_LIST_HEAD(&imp->imp_delayed_list);
1310 INIT_LIST_HEAD(&imp->imp_committed_list);
1311 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1312 imp->imp_known_replied_xid = 0;
1313 imp->imp_replay_cursor = &imp->imp_committed_list;
1314 spin_lock_init(&imp->imp_lock);
1315 imp->imp_last_success_conn = 0;
1316 imp->imp_state = LUSTRE_IMP_NEW;
1317 imp->imp_obd = class_incref(obd, "import", imp);
1318 mutex_init(&imp->imp_sec_mutex);
1319 init_waitqueue_head(&imp->imp_recovery_waitq);
1320 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1322 if (curr_pid_ns->child_reaper)
1323 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1325 imp->imp_sec_refpid = 1;
1327 atomic_set(&imp->imp_refcount, 2);
1328 atomic_set(&imp->imp_unregistering, 0);
1329 atomic_set(&imp->imp_inflight, 0);
1330 atomic_set(&imp->imp_replay_inflight, 0);
1331 atomic_set(&imp->imp_inval_count, 0);
1332 INIT_LIST_HEAD(&imp->imp_conn_list);
1333 init_imp_at(&imp->imp_at);
1335 /* the default magic is V2, will be used in connect RPC, and
1336 * then adjusted according to the flags in request/reply. */
1337 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1341 EXPORT_SYMBOL(class_new_import);
1343 void class_destroy_import(struct obd_import *import)
1345 LASSERT(import != NULL);
1346 LASSERT(import != LP_POISON);
1348 spin_lock(&import->imp_lock);
1349 import->imp_generation++;
1350 spin_unlock(&import->imp_lock);
1351 class_import_put(import);
1353 EXPORT_SYMBOL(class_destroy_import);
1355 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1357 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1359 spin_lock(&exp->exp_locks_list_guard);
1361 LASSERT(lock->l_exp_refs_nr >= 0);
1363 if (lock->l_exp_refs_target != NULL &&
1364 lock->l_exp_refs_target != exp) {
1365 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1366 exp, lock, lock->l_exp_refs_target);
1368 if ((lock->l_exp_refs_nr ++) == 0) {
1369 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1370 lock->l_exp_refs_target = exp;
1372 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1373 lock, exp, lock->l_exp_refs_nr);
1374 spin_unlock(&exp->exp_locks_list_guard);
1376 EXPORT_SYMBOL(__class_export_add_lock_ref);
1378 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1380 spin_lock(&exp->exp_locks_list_guard);
1381 LASSERT(lock->l_exp_refs_nr > 0);
1382 if (lock->l_exp_refs_target != exp) {
1383 LCONSOLE_WARN("lock %p, "
1384 "mismatching export pointers: %p, %p\n",
1385 lock, lock->l_exp_refs_target, exp);
1387 if (-- lock->l_exp_refs_nr == 0) {
1388 list_del_init(&lock->l_exp_refs_link);
1389 lock->l_exp_refs_target = NULL;
1391 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1392 lock, exp, lock->l_exp_refs_nr);
1393 spin_unlock(&exp->exp_locks_list_guard);
1395 EXPORT_SYMBOL(__class_export_del_lock_ref);
1398 /* A connection defines an export context in which preallocation can
1399 be managed. This releases the export pointer reference, and returns
1400 the export handle, so the export refcount is 1 when this function
1402 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1403 struct obd_uuid *cluuid)
1405 struct obd_export *export;
1406 LASSERT(conn != NULL);
1407 LASSERT(obd != NULL);
1408 LASSERT(cluuid != NULL);
1411 export = class_new_export(obd, cluuid);
1413 RETURN(PTR_ERR(export));
1415 conn->cookie = export->exp_handle.h_cookie;
1416 class_export_put(export);
1418 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1419 cluuid->uuid, conn->cookie);
1422 EXPORT_SYMBOL(class_connect);
1424 /* if export is involved in recovery then clean up related things */
1425 static void class_export_recovery_cleanup(struct obd_export *exp)
1427 struct obd_device *obd = exp->exp_obd;
1429 spin_lock(&obd->obd_recovery_task_lock);
1430 if (obd->obd_recovering) {
1431 if (exp->exp_in_recovery) {
1432 spin_lock(&exp->exp_lock);
1433 exp->exp_in_recovery = 0;
1434 spin_unlock(&exp->exp_lock);
1435 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1436 atomic_dec(&obd->obd_connected_clients);
1439 /* if called during recovery then should update
1440 * obd_stale_clients counter,
1441 * lightweight exports are not counted */
1442 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1443 exp->exp_obd->obd_stale_clients++;
1445 spin_unlock(&obd->obd_recovery_task_lock);
1447 spin_lock(&exp->exp_lock);
1448 /** Cleanup req replay fields */
1449 if (exp->exp_req_replay_needed) {
1450 exp->exp_req_replay_needed = 0;
1452 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1453 atomic_dec(&obd->obd_req_replay_clients);
1456 /** Cleanup lock replay data */
1457 if (exp->exp_lock_replay_needed) {
1458 exp->exp_lock_replay_needed = 0;
1460 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1461 atomic_dec(&obd->obd_lock_replay_clients);
1463 spin_unlock(&exp->exp_lock);
1466 /* This function removes 1-3 references from the export:
1467 * 1 - for export pointer passed
1468 * and if disconnect really need
1469 * 2 - removing from hash
1470 * 3 - in client_unlink_export
1471 * The export pointer passed to this function can destroyed */
1472 int class_disconnect(struct obd_export *export)
1474 int already_disconnected;
1477 if (export == NULL) {
1478 CWARN("attempting to free NULL export %p\n", export);
1482 spin_lock(&export->exp_lock);
1483 already_disconnected = export->exp_disconnected;
1484 export->exp_disconnected = 1;
1485 /* We hold references of export for uuid hash
1486 * and nid_hash and export link at least. So
1487 * it is safe to call cfs_hash_del in there. */
1488 if (!hlist_unhashed(&export->exp_nid_hash))
1489 cfs_hash_del(export->exp_obd->obd_nid_hash,
1490 &export->exp_connection->c_peer.nid,
1491 &export->exp_nid_hash);
1492 spin_unlock(&export->exp_lock);
1494 /* class_cleanup(), abort_recovery(), and class_fail_export()
1495 * all end up in here, and if any of them race we shouldn't
1496 * call extra class_export_puts(). */
1497 if (already_disconnected) {
1498 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1499 GOTO(no_disconn, already_disconnected);
1502 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1503 export->exp_handle.h_cookie);
1505 class_export_recovery_cleanup(export);
1506 class_unlink_export(export);
1508 class_export_put(export);
1511 EXPORT_SYMBOL(class_disconnect);
1513 /* Return non-zero for a fully connected export */
1514 int class_connected_export(struct obd_export *exp)
1519 spin_lock(&exp->exp_lock);
1520 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1521 spin_unlock(&exp->exp_lock);
1525 EXPORT_SYMBOL(class_connected_export);
1527 static void class_disconnect_export_list(struct list_head *list,
1528 enum obd_option flags)
1531 struct obd_export *exp;
1534 /* It's possible that an export may disconnect itself, but
1535 * nothing else will be added to this list. */
1536 while (!list_empty(list)) {
1537 exp = list_entry(list->next, struct obd_export,
1539 /* need for safe call CDEBUG after obd_disconnect */
1540 class_export_get(exp);
1542 spin_lock(&exp->exp_lock);
1543 exp->exp_flags = flags;
1544 spin_unlock(&exp->exp_lock);
1546 if (obd_uuid_equals(&exp->exp_client_uuid,
1547 &exp->exp_obd->obd_uuid)) {
1549 "exp %p export uuid == obd uuid, don't discon\n",
1551 /* Need to delete this now so we don't end up pointing
1552 * to work_list later when this export is cleaned up. */
1553 list_del_init(&exp->exp_obd_chain);
1554 class_export_put(exp);
1558 class_export_get(exp);
1559 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1560 "last request at %lld\n",
1561 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1562 exp, exp->exp_last_request_time);
1563 /* release one export reference anyway */
1564 rc = obd_disconnect(exp);
1566 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1567 obd_export_nid2str(exp), exp, rc);
1568 class_export_put(exp);
1573 void class_disconnect_exports(struct obd_device *obd)
1575 struct list_head work_list;
1578 /* Move all of the exports from obd_exports to a work list, en masse. */
1579 INIT_LIST_HEAD(&work_list);
1580 spin_lock(&obd->obd_dev_lock);
1581 list_splice_init(&obd->obd_exports, &work_list);
1582 list_splice_init(&obd->obd_delayed_exports, &work_list);
1583 spin_unlock(&obd->obd_dev_lock);
1585 if (!list_empty(&work_list)) {
1586 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1587 "disconnecting them\n", obd->obd_minor, obd);
1588 class_disconnect_export_list(&work_list,
1589 exp_flags_from_obd(obd));
1591 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1592 obd->obd_minor, obd);
1595 EXPORT_SYMBOL(class_disconnect_exports);
1597 /* Remove exports that have not completed recovery.
1599 void class_disconnect_stale_exports(struct obd_device *obd,
1600 int (*test_export)(struct obd_export *))
1602 struct list_head work_list;
1603 struct obd_export *exp, *n;
1607 INIT_LIST_HEAD(&work_list);
1608 spin_lock(&obd->obd_dev_lock);
1609 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1611 /* don't count self-export as client */
1612 if (obd_uuid_equals(&exp->exp_client_uuid,
1613 &exp->exp_obd->obd_uuid))
1616 /* don't evict clients which have no slot in last_rcvd
1617 * (e.g. lightweight connection) */
1618 if (exp->exp_target_data.ted_lr_idx == -1)
1621 spin_lock(&exp->exp_lock);
1622 if (exp->exp_failed || test_export(exp)) {
1623 spin_unlock(&exp->exp_lock);
1626 exp->exp_failed = 1;
1627 spin_unlock(&exp->exp_lock);
1629 list_move(&exp->exp_obd_chain, &work_list);
1631 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1632 obd->obd_name, exp->exp_client_uuid.uuid,
1633 obd_export_nid2str(exp));
1634 print_export_data(exp, "EVICTING", 0, D_HA);
1636 spin_unlock(&obd->obd_dev_lock);
1639 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1640 obd->obd_name, evicted);
1642 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1643 OBD_OPT_ABORT_RECOV);
1646 EXPORT_SYMBOL(class_disconnect_stale_exports);
1648 void class_fail_export(struct obd_export *exp)
1650 int rc, already_failed;
1652 spin_lock(&exp->exp_lock);
1653 already_failed = exp->exp_failed;
1654 exp->exp_failed = 1;
1655 spin_unlock(&exp->exp_lock);
1657 if (already_failed) {
1658 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1659 exp, exp->exp_client_uuid.uuid);
1663 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1664 exp, exp->exp_client_uuid.uuid);
1666 if (obd_dump_on_timeout)
1667 libcfs_debug_dumplog();
1669 /* need for safe call CDEBUG after obd_disconnect */
1670 class_export_get(exp);
1672 /* Most callers into obd_disconnect are removing their own reference
1673 * (request, for example) in addition to the one from the hash table.
1674 * We don't have such a reference here, so make one. */
1675 class_export_get(exp);
1676 rc = obd_disconnect(exp);
1678 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1680 CDEBUG(D_HA, "disconnected export %p/%s\n",
1681 exp, exp->exp_client_uuid.uuid);
1682 class_export_put(exp);
1684 EXPORT_SYMBOL(class_fail_export);
1686 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1688 struct cfs_hash *nid_hash;
1689 struct obd_export *doomed_exp = NULL;
1690 int exports_evicted = 0;
1692 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1694 spin_lock(&obd->obd_dev_lock);
1695 /* umount has run already, so evict thread should leave
1696 * its task to umount thread now */
1697 if (obd->obd_stopping) {
1698 spin_unlock(&obd->obd_dev_lock);
1699 return exports_evicted;
1701 nid_hash = obd->obd_nid_hash;
1702 cfs_hash_getref(nid_hash);
1703 spin_unlock(&obd->obd_dev_lock);
1706 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1707 if (doomed_exp == NULL)
1710 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1711 "nid %s found, wanted nid %s, requested nid %s\n",
1712 obd_export_nid2str(doomed_exp),
1713 libcfs_nid2str(nid_key), nid);
1714 LASSERTF(doomed_exp != obd->obd_self_export,
1715 "self-export is hashed by NID?\n");
1717 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1718 "request\n", obd->obd_name,
1719 obd_uuid2str(&doomed_exp->exp_client_uuid),
1720 obd_export_nid2str(doomed_exp));
1721 class_fail_export(doomed_exp);
1722 class_export_put(doomed_exp);
1725 cfs_hash_putref(nid_hash);
1727 if (!exports_evicted)
1728 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1729 obd->obd_name, nid);
1730 return exports_evicted;
1732 EXPORT_SYMBOL(obd_export_evict_by_nid);
1734 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1736 struct cfs_hash *uuid_hash;
1737 struct obd_export *doomed_exp = NULL;
1738 struct obd_uuid doomed_uuid;
1739 int exports_evicted = 0;
1741 spin_lock(&obd->obd_dev_lock);
1742 if (obd->obd_stopping) {
1743 spin_unlock(&obd->obd_dev_lock);
1744 return exports_evicted;
1746 uuid_hash = obd->obd_uuid_hash;
1747 cfs_hash_getref(uuid_hash);
1748 spin_unlock(&obd->obd_dev_lock);
1750 obd_str2uuid(&doomed_uuid, uuid);
1751 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1752 CERROR("%s: can't evict myself\n", obd->obd_name);
1753 cfs_hash_putref(uuid_hash);
1754 return exports_evicted;
1757 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1759 if (doomed_exp == NULL) {
1760 CERROR("%s: can't disconnect %s: no exports found\n",
1761 obd->obd_name, uuid);
1763 CWARN("%s: evicting %s at adminstrative request\n",
1764 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1765 class_fail_export(doomed_exp);
1766 class_export_put(doomed_exp);
1769 cfs_hash_putref(uuid_hash);
1771 return exports_evicted;
1774 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1775 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1776 EXPORT_SYMBOL(class_export_dump_hook);
1779 static void print_export_data(struct obd_export *exp, const char *status,
1780 int locks, int debug_level)
1782 struct ptlrpc_reply_state *rs;
1783 struct ptlrpc_reply_state *first_reply = NULL;
1786 spin_lock(&exp->exp_lock);
1787 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1793 spin_unlock(&exp->exp_lock);
1795 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1796 "%p %s %llu stale:%d\n",
1797 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1798 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1799 atomic_read(&exp->exp_rpc_count),
1800 atomic_read(&exp->exp_cb_count),
1801 atomic_read(&exp->exp_locks_count),
1802 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1803 nreplies, first_reply, nreplies > 3 ? "..." : "",
1804 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1805 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1806 if (locks && class_export_dump_hook != NULL)
1807 class_export_dump_hook(exp);
1811 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1813 struct obd_export *exp;
1815 spin_lock(&obd->obd_dev_lock);
1816 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1817 print_export_data(exp, "ACTIVE", locks, debug_level);
1818 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1819 print_export_data(exp, "UNLINKED", locks, debug_level);
1820 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1821 print_export_data(exp, "DELAYED", locks, debug_level);
1822 spin_unlock(&obd->obd_dev_lock);
1825 void obd_exports_barrier(struct obd_device *obd)
1828 LASSERT(list_empty(&obd->obd_exports));
1829 spin_lock(&obd->obd_dev_lock);
1830 while (!list_empty(&obd->obd_unlinked_exports)) {
1831 spin_unlock(&obd->obd_dev_lock);
1832 set_current_state(TASK_UNINTERRUPTIBLE);
1833 schedule_timeout(cfs_time_seconds(waited));
1834 if (waited > 5 && is_power_of_2(waited)) {
1835 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1836 "more than %d seconds. "
1837 "The obd refcount = %d. Is it stuck?\n",
1838 obd->obd_name, waited,
1839 atomic_read(&obd->obd_refcount));
1840 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1843 spin_lock(&obd->obd_dev_lock);
1845 spin_unlock(&obd->obd_dev_lock);
1847 EXPORT_SYMBOL(obd_exports_barrier);
1850 * Add export to the obd_zombe thread and notify it.
1852 static void obd_zombie_export_add(struct obd_export *exp) {
1853 atomic_dec(&obd_stale_export_num);
1854 spin_lock(&exp->exp_obd->obd_dev_lock);
1855 LASSERT(!list_empty(&exp->exp_obd_chain));
1856 list_del_init(&exp->exp_obd_chain);
1857 spin_unlock(&exp->exp_obd->obd_dev_lock);
1859 queue_work(zombie_wq, &exp->exp_zombie_work);
1863 * Add import to the obd_zombe thread and notify it.
1865 static void obd_zombie_import_add(struct obd_import *imp) {
1866 LASSERT(imp->imp_sec == NULL);
1868 queue_work(zombie_wq, &imp->imp_zombie_work);
1872 * wait when obd_zombie import/export queues become empty
1874 void obd_zombie_barrier(void)
1876 flush_workqueue(zombie_wq);
1878 EXPORT_SYMBOL(obd_zombie_barrier);
1881 struct obd_export *obd_stale_export_get(void)
1883 struct obd_export *exp = NULL;
1886 spin_lock(&obd_stale_export_lock);
1887 if (!list_empty(&obd_stale_exports)) {
1888 exp = list_entry(obd_stale_exports.next,
1889 struct obd_export, exp_stale_list);
1890 list_del_init(&exp->exp_stale_list);
1892 spin_unlock(&obd_stale_export_lock);
1895 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1896 atomic_read(&obd_stale_export_num));
1900 EXPORT_SYMBOL(obd_stale_export_get);
1902 void obd_stale_export_put(struct obd_export *exp)
1906 LASSERT(list_empty(&exp->exp_stale_list));
1907 if (exp->exp_lock_hash &&
1908 atomic_read(&exp->exp_lock_hash->hs_count)) {
1909 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1910 atomic_read(&obd_stale_export_num));
1912 spin_lock_bh(&exp->exp_bl_list_lock);
1913 spin_lock(&obd_stale_export_lock);
1914 /* Add to the tail if there is no blocked locks,
1915 * to the head otherwise. */
1916 if (list_empty(&exp->exp_bl_list))
1917 list_add_tail(&exp->exp_stale_list,
1918 &obd_stale_exports);
1920 list_add(&exp->exp_stale_list,
1921 &obd_stale_exports);
1923 spin_unlock(&obd_stale_export_lock);
1924 spin_unlock_bh(&exp->exp_bl_list_lock);
1926 class_export_put(exp);
1930 EXPORT_SYMBOL(obd_stale_export_put);
1933 * Adjust the position of the export in the stale list,
1934 * i.e. move to the head of the list if is needed.
1936 void obd_stale_export_adjust(struct obd_export *exp)
1938 LASSERT(exp != NULL);
1939 spin_lock_bh(&exp->exp_bl_list_lock);
1940 spin_lock(&obd_stale_export_lock);
1942 if (!list_empty(&exp->exp_stale_list) &&
1943 !list_empty(&exp->exp_bl_list))
1944 list_move(&exp->exp_stale_list, &obd_stale_exports);
1946 spin_unlock(&obd_stale_export_lock);
1947 spin_unlock_bh(&exp->exp_bl_list_lock);
1949 EXPORT_SYMBOL(obd_stale_export_adjust);
1952 * start destroy zombie import/export thread
1954 int obd_zombie_impexp_init(void)
1956 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1964 * stop destroy zombie import/export thread
1966 void obd_zombie_impexp_stop(void)
1968 destroy_workqueue(zombie_wq);
1969 LASSERT(list_empty(&obd_stale_exports));
1972 /***** Kernel-userspace comm helpers *******/
1974 /* Get length of entire message, including header */
1975 int kuc_len(int payload_len)
1977 return sizeof(struct kuc_hdr) + payload_len;
1979 EXPORT_SYMBOL(kuc_len);
1981 /* Get a pointer to kuc header, given a ptr to the payload
1982 * @param p Pointer to payload area
1983 * @returns Pointer to kuc header
1985 struct kuc_hdr * kuc_ptr(void *p)
1987 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1988 LASSERT(lh->kuc_magic == KUC_MAGIC);
1991 EXPORT_SYMBOL(kuc_ptr);
1993 /* Alloc space for a message, and fill in header
1994 * @return Pointer to payload area
1996 void *kuc_alloc(int payload_len, int transport, int type)
1999 int len = kuc_len(payload_len);
2003 return ERR_PTR(-ENOMEM);
2005 lh->kuc_magic = KUC_MAGIC;
2006 lh->kuc_transport = transport;
2007 lh->kuc_msgtype = type;
2008 lh->kuc_msglen = len;
2010 return (void *)(lh + 1);
2012 EXPORT_SYMBOL(kuc_alloc);
2014 /* Takes pointer to payload area */
2015 void kuc_free(void *p, int payload_len)
2017 struct kuc_hdr *lh = kuc_ptr(p);
2018 OBD_FREE(lh, kuc_len(payload_len));
2020 EXPORT_SYMBOL(kuc_free);
2022 struct obd_request_slot_waiter {
2023 struct list_head orsw_entry;
2024 wait_queue_head_t orsw_waitq;
2028 static bool obd_request_slot_avail(struct client_obd *cli,
2029 struct obd_request_slot_waiter *orsw)
2033 spin_lock(&cli->cl_loi_list_lock);
2034 avail = !!list_empty(&orsw->orsw_entry);
2035 spin_unlock(&cli->cl_loi_list_lock);
2041 * For network flow control, the RPC sponsor needs to acquire a credit
2042 * before sending the RPC. The credits count for a connection is defined
2043 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2044 * the subsequent RPC sponsors need to wait until others released their
2045 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2047 int obd_get_request_slot(struct client_obd *cli)
2049 struct obd_request_slot_waiter orsw;
2050 struct l_wait_info lwi;
2053 spin_lock(&cli->cl_loi_list_lock);
2054 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2055 cli->cl_rpcs_in_flight++;
2056 spin_unlock(&cli->cl_loi_list_lock);
2060 init_waitqueue_head(&orsw.orsw_waitq);
2061 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2062 orsw.orsw_signaled = false;
2063 spin_unlock(&cli->cl_loi_list_lock);
2065 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2066 rc = l_wait_event(orsw.orsw_waitq,
2067 obd_request_slot_avail(cli, &orsw) ||
2071 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2072 * freed but other (such as obd_put_request_slot) is using it. */
2073 spin_lock(&cli->cl_loi_list_lock);
2075 if (!orsw.orsw_signaled) {
2076 if (list_empty(&orsw.orsw_entry))
2077 cli->cl_rpcs_in_flight--;
2079 list_del(&orsw.orsw_entry);
2083 if (orsw.orsw_signaled) {
2084 LASSERT(list_empty(&orsw.orsw_entry));
2088 spin_unlock(&cli->cl_loi_list_lock);
2092 EXPORT_SYMBOL(obd_get_request_slot);
2094 void obd_put_request_slot(struct client_obd *cli)
2096 struct obd_request_slot_waiter *orsw;
2098 spin_lock(&cli->cl_loi_list_lock);
2099 cli->cl_rpcs_in_flight--;
2101 /* If there is free slot, wakeup the first waiter. */
2102 if (!list_empty(&cli->cl_flight_waiters) &&
2103 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2104 orsw = list_entry(cli->cl_flight_waiters.next,
2105 struct obd_request_slot_waiter, orsw_entry);
2106 list_del_init(&orsw->orsw_entry);
2107 cli->cl_rpcs_in_flight++;
2108 wake_up(&orsw->orsw_waitq);
2110 spin_unlock(&cli->cl_loi_list_lock);
2112 EXPORT_SYMBOL(obd_put_request_slot);
2114 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2116 return cli->cl_max_rpcs_in_flight;
2118 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2120 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2122 struct obd_request_slot_waiter *orsw;
2129 if (max > OBD_MAX_RIF_MAX || max < 1)
2132 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2133 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2134 /* adjust max_mod_rpcs_in_flight to ensure it is always
2135 * strictly lower that max_rpcs_in_flight */
2137 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2138 "because it must be higher than "
2139 "max_mod_rpcs_in_flight value",
2140 cli->cl_import->imp_obd->obd_name);
2143 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2144 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2150 spin_lock(&cli->cl_loi_list_lock);
2151 old = cli->cl_max_rpcs_in_flight;
2152 cli->cl_max_rpcs_in_flight = max;
2153 client_adjust_max_dirty(cli);
2157 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2158 for (i = 0; i < diff; i++) {
2159 if (list_empty(&cli->cl_flight_waiters))
2162 orsw = list_entry(cli->cl_flight_waiters.next,
2163 struct obd_request_slot_waiter, orsw_entry);
2164 list_del_init(&orsw->orsw_entry);
2165 cli->cl_rpcs_in_flight++;
2166 wake_up(&orsw->orsw_waitq);
2168 spin_unlock(&cli->cl_loi_list_lock);
2172 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2174 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2176 return cli->cl_max_mod_rpcs_in_flight;
2178 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2180 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2182 struct obd_connect_data *ocd;
2186 if (max > OBD_MAX_RIF_MAX || max < 1)
2189 /* cannot exceed or equal max_rpcs_in_flight */
2190 if (max >= cli->cl_max_rpcs_in_flight) {
2191 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2192 "higher or equal to max_rpcs_in_flight value (%u)\n",
2193 cli->cl_import->imp_obd->obd_name,
2194 max, cli->cl_max_rpcs_in_flight);
2198 /* cannot exceed max modify RPCs in flight supported by the server */
2199 ocd = &cli->cl_import->imp_connect_data;
2200 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2201 maxmodrpcs = ocd->ocd_maxmodrpcs;
2204 if (max > maxmodrpcs) {
2205 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2206 "higher than max_mod_rpcs_per_client value (%hu) "
2207 "returned by the server at connection\n",
2208 cli->cl_import->imp_obd->obd_name,
2213 spin_lock(&cli->cl_mod_rpcs_lock);
2215 prev = cli->cl_max_mod_rpcs_in_flight;
2216 cli->cl_max_mod_rpcs_in_flight = max;
2218 /* wakeup waiters if limit has been increased */
2219 if (cli->cl_max_mod_rpcs_in_flight > prev)
2220 wake_up(&cli->cl_mod_rpcs_waitq);
2222 spin_unlock(&cli->cl_mod_rpcs_lock);
2226 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2228 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2229 struct seq_file *seq)
2231 unsigned long mod_tot = 0, mod_cum;
2232 struct timespec64 now;
2235 ktime_get_real_ts64(&now);
2237 spin_lock(&cli->cl_mod_rpcs_lock);
2239 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2240 (s64)now.tv_sec, now.tv_nsec);
2241 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2242 cli->cl_mod_rpcs_in_flight);
2244 seq_printf(seq, "\n\t\t\tmodify\n");
2245 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2247 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2250 for (i = 0; i < OBD_HIST_MAX; i++) {
2251 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2253 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2254 i, mod, pct(mod, mod_tot),
2255 pct(mod_cum, mod_tot));
2256 if (mod_cum == mod_tot)
2260 spin_unlock(&cli->cl_mod_rpcs_lock);
2264 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2266 /* The number of modify RPCs sent in parallel is limited
2267 * because the server has a finite number of slots per client to
2268 * store request result and ensure reply reconstruction when needed.
2269 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2270 * that takes into account server limit and cl_max_rpcs_in_flight
2272 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2273 * one close request is allowed above the maximum.
2275 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2280 /* A slot is available if
2281 * - number of modify RPCs in flight is less than the max
2282 * - it's a close RPC and no other close request is in flight
2284 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2285 (close_req && cli->cl_close_rpcs_in_flight == 0);
2290 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2295 spin_lock(&cli->cl_mod_rpcs_lock);
2296 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2297 spin_unlock(&cli->cl_mod_rpcs_lock);
2301 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2304 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2305 it->it_op == IT_READDIR ||
2306 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2311 /* Get a modify RPC slot from the obd client @cli according
2312 * to the kind of operation @opc that is going to be sent
2313 * and the intent @it of the operation if it applies.
2314 * If the maximum number of modify RPCs in flight is reached
2315 * the thread is put to sleep.
2316 * Returns the tag to be set in the request message. Tag 0
2317 * is reserved for non-modifying requests.
2319 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2320 struct lookup_intent *it)
2322 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2323 bool close_req = false;
2326 /* read-only metadata RPCs don't consume a slot on MDT
2327 * for reply reconstruction
2329 if (obd_skip_mod_rpc_slot(it))
2332 if (opc == MDS_CLOSE)
2336 spin_lock(&cli->cl_mod_rpcs_lock);
2337 max = cli->cl_max_mod_rpcs_in_flight;
2338 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2339 /* there is a slot available */
2340 cli->cl_mod_rpcs_in_flight++;
2342 cli->cl_close_rpcs_in_flight++;
2343 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2344 cli->cl_mod_rpcs_in_flight);
2345 /* find a free tag */
2346 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2348 LASSERT(i < OBD_MAX_RIF_MAX);
2349 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2350 spin_unlock(&cli->cl_mod_rpcs_lock);
2351 /* tag 0 is reserved for non-modify RPCs */
2354 spin_unlock(&cli->cl_mod_rpcs_lock);
2356 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2357 "opc %u, max %hu\n",
2358 cli->cl_import->imp_obd->obd_name, opc, max);
2360 l_wait_event_exclusive(cli->cl_mod_rpcs_waitq,
2361 obd_mod_rpc_slot_avail(cli, close_req),
2365 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2367 /* Put a modify RPC slot from the obd client @cli according
2368 * to the kind of operation @opc that has been sent and the
2369 * intent @it of the operation if it applies.
2371 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2372 struct lookup_intent *it, __u16 tag)
2374 bool close_req = false;
2376 if (obd_skip_mod_rpc_slot(it))
2379 if (opc == MDS_CLOSE)
2382 spin_lock(&cli->cl_mod_rpcs_lock);
2383 cli->cl_mod_rpcs_in_flight--;
2385 cli->cl_close_rpcs_in_flight--;
2386 /* release the tag in the bitmap */
2387 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2388 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2389 spin_unlock(&cli->cl_mod_rpcs_lock);
2390 wake_up(&cli->cl_mod_rpcs_waitq);
2392 EXPORT_SYMBOL(obd_put_mod_rpc_slot);