4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59 const char *status, int locks, int debug_level);
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90 obd, obd->obd_namespace, obd->obd_force);
93 lu_ref_fini(&obd->obd_reference);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 struct obd_type *class_search_type(const char *name)
99 struct kobject *kobj = kset_find_obj(lustre_kset, name);
101 if (kobj && kobj->ktype == &class_ktype)
102 return container_of(kobj, struct obd_type, typ_kobj);
107 EXPORT_SYMBOL(class_search_type);
109 struct obd_type *class_get_type(const char *name)
111 struct obd_type *type;
113 type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
116 const char *modname = name;
118 #ifdef HAVE_SERVER_SUPPORT
119 if (strcmp(modname, "obdfilter") == 0)
122 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123 modname = LUSTRE_OSP_NAME;
125 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126 modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 spin_lock(&type->obd_type_lock);
141 try_module_get(type->typ_dt_ops->o_owner);
142 spin_unlock(&type->obd_type_lock);
143 /* class_search_type() returned a counted reference,
144 * but we don't need that count any more as
145 * we have one through typ_refcnt.
147 kobject_put(&type->typ_kobj);
152 void class_put_type(struct obd_type *type)
155 spin_lock(&type->obd_type_lock);
157 module_put(type->typ_dt_ops->o_owner);
158 spin_unlock(&type->obd_type_lock);
161 static void class_sysfs_release(struct kobject *kobj)
163 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
165 debugfs_remove_recursive(type->typ_debugfs_entry);
166 type->typ_debugfs_entry = NULL;
169 lu_device_type_fini(type->typ_lu);
171 #ifdef CONFIG_PROC_FS
172 if (type->typ_name && type->typ_procroot)
173 remove_proc_subtree(type->typ_name, proc_lustre_root);
175 if (type->typ_md_ops)
176 OBD_FREE_PTR(type->typ_md_ops);
177 if (type->typ_dt_ops)
178 OBD_FREE_PTR(type->typ_dt_ops);
180 OBD_FREE(type, sizeof(*type));
183 static struct kobj_type class_ktype = {
184 .sysfs_ops = &lustre_sysfs_ops,
185 .release = class_sysfs_release,
188 #ifdef HAVE_SERVER_SUPPORT
189 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
191 struct dentry *symlink;
192 struct obd_type *type;
195 type = class_search_type(name);
197 kobject_put(&type->typ_kobj);
198 return ERR_PTR(-EEXIST);
201 OBD_ALLOC(type, sizeof(*type));
203 return ERR_PTR(-ENOMEM);
205 type->typ_kobj.kset = lustre_kset;
206 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
207 &lustre_kset->kobj, "%s", name);
211 symlink = debugfs_create_dir(name, debugfs_lustre_root);
212 if (IS_ERR_OR_NULL(symlink)) {
213 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
214 kobject_put(&type->typ_kobj);
217 type->typ_debugfs_entry = symlink;
218 type->typ_sym_filter = true;
221 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
223 if (IS_ERR(type->typ_procroot)) {
224 CERROR("%s: can't create compat proc entry: %d\n",
225 name, (int)PTR_ERR(type->typ_procroot));
226 type->typ_procroot = NULL;
232 EXPORT_SYMBOL(class_add_symlinks);
233 #endif /* HAVE_SERVER_SUPPORT */
235 #define CLASS_MAX_NAME 1024
237 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
238 bool enable_proc, struct lprocfs_vars *vars,
239 const char *name, struct lu_device_type *ldt)
241 struct obd_type *type;
246 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
248 type = class_search_type(name);
250 #ifdef HAVE_SERVER_SUPPORT
251 if (type->typ_sym_filter)
253 #endif /* HAVE_SERVER_SUPPORT */
254 kobject_put(&type->typ_kobj);
255 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
259 OBD_ALLOC(type, sizeof(*type));
263 type->typ_kobj.kset = lustre_kset;
264 kobject_init(&type->typ_kobj, &class_ktype);
265 #ifdef HAVE_SERVER_SUPPORT
267 #endif /* HAVE_SERVER_SUPPORT */
268 OBD_ALLOC_PTR(type->typ_dt_ops);
269 OBD_ALLOC_PTR(type->typ_md_ops);
271 if (type->typ_dt_ops == NULL ||
272 type->typ_md_ops == NULL)
273 GOTO (failed, rc = -ENOMEM);
275 *(type->typ_dt_ops) = *dt_ops;
276 /* md_ops is optional */
278 *(type->typ_md_ops) = *md_ops;
279 spin_lock_init(&type->obd_type_lock);
281 #ifdef HAVE_SERVER_SUPPORT
282 if (type->typ_sym_filter) {
283 type->typ_sym_filter = false;
284 kobject_put(&type->typ_kobj);
288 #ifdef CONFIG_PROC_FS
289 if (enable_proc && !type->typ_procroot) {
290 type->typ_procroot = lprocfs_register(name,
293 if (IS_ERR(type->typ_procroot)) {
294 rc = PTR_ERR(type->typ_procroot);
295 type->typ_procroot = NULL;
300 type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
302 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
303 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
305 type->typ_debugfs_entry = NULL;
309 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
312 #ifdef HAVE_SERVER_SUPPORT
317 rc = lu_device_type_init(ldt);
325 kobject_put(&type->typ_kobj);
329 EXPORT_SYMBOL(class_register_type);
331 int class_unregister_type(const char *name)
333 struct obd_type *type = class_search_type(name);
338 CERROR("unknown obd type\n");
342 if (type->typ_refcnt) {
343 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
344 /* This is a bad situation, let's make the best of it */
345 /* Remove ops, but leave the name for debugging */
346 OBD_FREE_PTR(type->typ_dt_ops);
347 OBD_FREE_PTR(type->typ_md_ops);
348 GOTO(out_put, rc = -EBUSY);
351 /* Put the final ref */
352 kobject_put(&type->typ_kobj);
354 /* Put the ref returned by class_search_type() */
355 kobject_put(&type->typ_kobj);
358 } /* class_unregister_type */
359 EXPORT_SYMBOL(class_unregister_type);
362 * Create a new obd device.
364 * Allocate the new obd_device and initialize it.
366 * \param[in] type_name obd device type string.
367 * \param[in] name obd device name.
368 * \param[in] uuid obd device UUID
370 * \retval newdev pointer to created obd_device
371 * \retval ERR_PTR(errno) on error
373 struct obd_device *class_newdev(const char *type_name, const char *name,
376 struct obd_device *newdev;
377 struct obd_type *type = NULL;
380 if (strlen(name) >= MAX_OBD_NAME) {
381 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
382 RETURN(ERR_PTR(-EINVAL));
385 type = class_get_type(type_name);
387 CERROR("OBD: unknown type: %s\n", type_name);
388 RETURN(ERR_PTR(-ENODEV));
391 newdev = obd_device_alloc();
392 if (newdev == NULL) {
393 class_put_type(type);
394 RETURN(ERR_PTR(-ENOMEM));
396 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
397 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
398 newdev->obd_type = type;
399 newdev->obd_minor = -1;
401 rwlock_init(&newdev->obd_pool_lock);
402 newdev->obd_pool_limit = 0;
403 newdev->obd_pool_slv = 0;
405 INIT_LIST_HEAD(&newdev->obd_exports);
406 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
407 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
408 INIT_LIST_HEAD(&newdev->obd_exports_timed);
409 INIT_LIST_HEAD(&newdev->obd_nid_stats);
410 spin_lock_init(&newdev->obd_nid_lock);
411 spin_lock_init(&newdev->obd_dev_lock);
412 mutex_init(&newdev->obd_dev_mutex);
413 spin_lock_init(&newdev->obd_osfs_lock);
414 /* newdev->obd_osfs_age must be set to a value in the distant
415 * past to guarantee a fresh statfs is fetched on mount. */
416 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
418 /* XXX belongs in setup not attach */
419 init_rwsem(&newdev->obd_observer_link_sem);
421 spin_lock_init(&newdev->obd_recovery_task_lock);
422 init_waitqueue_head(&newdev->obd_next_transno_waitq);
423 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
424 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
425 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
426 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
427 INIT_LIST_HEAD(&newdev->obd_evict_list);
428 INIT_LIST_HEAD(&newdev->obd_lwp_list);
430 llog_group_init(&newdev->obd_olg);
431 /* Detach drops this */
432 atomic_set(&newdev->obd_refcount, 1);
433 lu_ref_init(&newdev->obd_reference);
434 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
436 newdev->obd_conn_inprogress = 0;
438 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
440 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
441 newdev->obd_name, newdev);
449 * \param[in] obd obd_device to be freed
453 void class_free_dev(struct obd_device *obd)
455 struct obd_type *obd_type = obd->obd_type;
457 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
458 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
459 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
460 "obd %p != obd_devs[%d] %p\n",
461 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
462 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
463 "obd_refcount should be 0, not %d\n",
464 atomic_read(&obd->obd_refcount));
465 LASSERT(obd_type != NULL);
467 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
468 obd->obd_name, obd->obd_type->typ_name);
470 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
471 obd->obd_name, obd->obd_uuid.uuid);
472 if (obd->obd_stopping) {
475 /* If we're not stopping, we were never set up */
476 err = obd_cleanup(obd);
478 CERROR("Cleanup %s returned %d\n",
482 obd_device_free(obd);
484 class_put_type(obd_type);
488 * Unregister obd device.
490 * Free slot in obd_dev[] used by \a obd.
492 * \param[in] new_obd obd_device to be unregistered
496 void class_unregister_device(struct obd_device *obd)
498 write_lock(&obd_dev_lock);
499 if (obd->obd_minor >= 0) {
500 LASSERT(obd_devs[obd->obd_minor] == obd);
501 obd_devs[obd->obd_minor] = NULL;
504 write_unlock(&obd_dev_lock);
508 * Register obd device.
510 * Find free slot in obd_devs[], fills it with \a new_obd.
512 * \param[in] new_obd obd_device to be registered
515 * \retval -EEXIST device with this name is registered
516 * \retval -EOVERFLOW obd_devs[] is full
518 int class_register_device(struct obd_device *new_obd)
522 int new_obd_minor = 0;
523 bool minor_assign = false;
524 bool retried = false;
527 write_lock(&obd_dev_lock);
528 for (i = 0; i < class_devno_max(); i++) {
529 struct obd_device *obd = class_num2obd(i);
532 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
535 write_unlock(&obd_dev_lock);
537 /* the obd_device could be waited to be
538 * destroyed by the "obd_zombie_impexp_thread".
540 obd_zombie_barrier();
545 CERROR("%s: already exists, won't add\n",
547 /* in case we found a free slot before duplicate */
548 minor_assign = false;
552 if (!minor_assign && obd == NULL) {
559 new_obd->obd_minor = new_obd_minor;
560 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
561 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
562 obd_devs[new_obd_minor] = new_obd;
566 CERROR("%s: all %u/%u devices used, increase "
567 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
568 i, class_devno_max(), ret);
571 write_unlock(&obd_dev_lock);
576 static int class_name2dev_nolock(const char *name)
583 for (i = 0; i < class_devno_max(); i++) {
584 struct obd_device *obd = class_num2obd(i);
586 if (obd && strcmp(name, obd->obd_name) == 0) {
587 /* Make sure we finished attaching before we give
588 out any references */
589 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
590 if (obd->obd_attached) {
600 int class_name2dev(const char *name)
607 read_lock(&obd_dev_lock);
608 i = class_name2dev_nolock(name);
609 read_unlock(&obd_dev_lock);
613 EXPORT_SYMBOL(class_name2dev);
615 struct obd_device *class_name2obd(const char *name)
617 int dev = class_name2dev(name);
619 if (dev < 0 || dev > class_devno_max())
621 return class_num2obd(dev);
623 EXPORT_SYMBOL(class_name2obd);
625 int class_uuid2dev_nolock(struct obd_uuid *uuid)
629 for (i = 0; i < class_devno_max(); i++) {
630 struct obd_device *obd = class_num2obd(i);
632 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
633 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
641 int class_uuid2dev(struct obd_uuid *uuid)
645 read_lock(&obd_dev_lock);
646 i = class_uuid2dev_nolock(uuid);
647 read_unlock(&obd_dev_lock);
651 EXPORT_SYMBOL(class_uuid2dev);
653 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
655 int dev = class_uuid2dev(uuid);
658 return class_num2obd(dev);
660 EXPORT_SYMBOL(class_uuid2obd);
663 * Get obd device from ::obd_devs[]
665 * \param num [in] array index
667 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
668 * otherwise return the obd device there.
670 struct obd_device *class_num2obd(int num)
672 struct obd_device *obd = NULL;
674 if (num < class_devno_max()) {
679 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
680 "%p obd_magic %08x != %08x\n",
681 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
682 LASSERTF(obd->obd_minor == num,
683 "%p obd_minor %0d != %0d\n",
684 obd, obd->obd_minor, num);
691 * Find obd in obd_dev[] by name or uuid.
693 * Increment obd's refcount if found.
695 * \param[in] str obd name or uuid
697 * \retval NULL if not found
698 * \retval target pointer to found obd_device
700 struct obd_device *class_dev_by_str(const char *str)
702 struct obd_device *target = NULL;
703 struct obd_uuid tgtuuid;
706 obd_str2uuid(&tgtuuid, str);
708 read_lock(&obd_dev_lock);
709 rc = class_uuid2dev_nolock(&tgtuuid);
711 rc = class_name2dev_nolock(str);
714 target = class_num2obd(rc);
717 class_incref(target, "find", current);
718 read_unlock(&obd_dev_lock);
722 EXPORT_SYMBOL(class_dev_by_str);
725 * Get obd devices count. Device in any
727 * \retval obd device count
729 int get_devices_count(void)
731 int index, max_index = class_devno_max(), dev_count = 0;
733 read_lock(&obd_dev_lock);
734 for (index = 0; index <= max_index; index++) {
735 struct obd_device *obd = class_num2obd(index);
739 read_unlock(&obd_dev_lock);
743 EXPORT_SYMBOL(get_devices_count);
745 void class_obd_list(void)
750 read_lock(&obd_dev_lock);
751 for (i = 0; i < class_devno_max(); i++) {
752 struct obd_device *obd = class_num2obd(i);
756 if (obd->obd_stopping)
758 else if (obd->obd_set_up)
760 else if (obd->obd_attached)
764 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
765 i, status, obd->obd_type->typ_name,
766 obd->obd_name, obd->obd_uuid.uuid,
767 atomic_read(&obd->obd_refcount));
769 read_unlock(&obd_dev_lock);
772 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
773 specified, then only the client with that uuid is returned,
774 otherwise any client connected to the tgt is returned. */
775 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
776 const char *type_name,
777 struct obd_uuid *grp_uuid)
781 read_lock(&obd_dev_lock);
782 for (i = 0; i < class_devno_max(); i++) {
783 struct obd_device *obd = class_num2obd(i);
787 if ((strncmp(obd->obd_type->typ_name, type_name,
788 strlen(type_name)) == 0)) {
789 if (obd_uuid_equals(tgt_uuid,
790 &obd->u.cli.cl_target_uuid) &&
791 ((grp_uuid)? obd_uuid_equals(grp_uuid,
792 &obd->obd_uuid) : 1)) {
793 read_unlock(&obd_dev_lock);
798 read_unlock(&obd_dev_lock);
802 EXPORT_SYMBOL(class_find_client_obd);
804 /* Iterate the obd_device list looking devices have grp_uuid. Start
805 searching at *next, and if a device is found, the next index to look
806 at is saved in *next. If next is NULL, then the first matching device
807 will always be returned. */
808 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
814 else if (*next >= 0 && *next < class_devno_max())
819 read_lock(&obd_dev_lock);
820 for (; i < class_devno_max(); i++) {
821 struct obd_device *obd = class_num2obd(i);
825 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
828 read_unlock(&obd_dev_lock);
832 read_unlock(&obd_dev_lock);
836 EXPORT_SYMBOL(class_devices_in_group);
839 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
840 * adjust sptlrpc settings accordingly.
842 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
844 struct obd_device *obd;
848 LASSERT(namelen > 0);
850 read_lock(&obd_dev_lock);
851 for (i = 0; i < class_devno_max(); i++) {
852 obd = class_num2obd(i);
854 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
857 /* only notify mdc, osc, osp, lwp, mdt, ost
858 * because only these have a -sptlrpc llog */
859 type = obd->obd_type->typ_name;
860 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
861 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
862 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
863 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
864 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
865 strcmp(type, LUSTRE_OST_NAME) != 0)
868 if (strncmp(obd->obd_name, fsname, namelen))
871 class_incref(obd, __FUNCTION__, obd);
872 read_unlock(&obd_dev_lock);
873 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
874 sizeof(KEY_SPTLRPC_CONF),
875 KEY_SPTLRPC_CONF, 0, NULL, NULL);
877 class_decref(obd, __FUNCTION__, obd);
878 read_lock(&obd_dev_lock);
880 read_unlock(&obd_dev_lock);
883 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
885 void obd_cleanup_caches(void)
888 if (obd_device_cachep) {
889 kmem_cache_destroy(obd_device_cachep);
890 obd_device_cachep = NULL;
896 int obd_init_caches(void)
901 LASSERT(obd_device_cachep == NULL);
902 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
903 sizeof(struct obd_device),
904 0, 0, 0, sizeof(struct obd_device), NULL);
905 if (!obd_device_cachep)
906 GOTO(out, rc = -ENOMEM);
910 obd_cleanup_caches();
914 static struct portals_handle_ops export_handle_ops;
916 /* map connection to client */
917 struct obd_export *class_conn2export(struct lustre_handle *conn)
919 struct obd_export *export;
923 CDEBUG(D_CACHE, "looking for null handle\n");
927 if (conn->cookie == -1) { /* this means assign a new connection */
928 CDEBUG(D_CACHE, "want a new connection\n");
932 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
933 export = class_handle2object(conn->cookie, &export_handle_ops);
936 EXPORT_SYMBOL(class_conn2export);
938 struct obd_device *class_exp2obd(struct obd_export *exp)
944 EXPORT_SYMBOL(class_exp2obd);
946 struct obd_import *class_exp2cliimp(struct obd_export *exp)
948 struct obd_device *obd = exp->exp_obd;
951 return obd->u.cli.cl_import;
953 EXPORT_SYMBOL(class_exp2cliimp);
955 /* Export management functions */
956 static void class_export_destroy(struct obd_export *exp)
958 struct obd_device *obd = exp->exp_obd;
961 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
962 LASSERT(obd != NULL);
964 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
965 exp->exp_client_uuid.uuid, obd->obd_name);
967 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
968 if (exp->exp_connection)
969 ptlrpc_put_connection_superhack(exp->exp_connection);
971 LASSERT(list_empty(&exp->exp_outstanding_replies));
972 LASSERT(list_empty(&exp->exp_uncommitted_replies));
973 LASSERT(list_empty(&exp->exp_req_replay_queue));
974 LASSERT(list_empty(&exp->exp_hp_rpcs));
975 obd_destroy_export(exp);
976 /* self export doesn't hold a reference to an obd, although it
977 * exists until freeing of the obd */
978 if (exp != obd->obd_self_export)
979 class_decref(obd, "export", exp);
981 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
985 static void export_handle_addref(void *export)
987 class_export_get(export);
990 static struct portals_handle_ops export_handle_ops = {
991 .hop_addref = export_handle_addref,
995 struct obd_export *class_export_get(struct obd_export *exp)
997 atomic_inc(&exp->exp_refcount);
998 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
999 atomic_read(&exp->exp_refcount));
1002 EXPORT_SYMBOL(class_export_get);
1004 void class_export_put(struct obd_export *exp)
1006 LASSERT(exp != NULL);
1007 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1008 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1009 atomic_read(&exp->exp_refcount) - 1);
1011 if (atomic_dec_and_test(&exp->exp_refcount)) {
1012 struct obd_device *obd = exp->exp_obd;
1014 CDEBUG(D_IOCTL, "final put %p/%s\n",
1015 exp, exp->exp_client_uuid.uuid);
1017 /* release nid stat refererence */
1018 lprocfs_exp_cleanup(exp);
1020 if (exp == obd->obd_self_export) {
1021 /* self export should be destroyed without
1022 * zombie thread as it doesn't hold a
1023 * reference to obd and doesn't hold any
1025 class_export_destroy(exp);
1026 /* self export is destroyed, no class
1027 * references exist and it is safe to free
1029 class_free_dev(obd);
1031 LASSERT(!list_empty(&exp->exp_obd_chain));
1032 obd_zombie_export_add(exp);
1037 EXPORT_SYMBOL(class_export_put);
1039 static void obd_zombie_exp_cull(struct work_struct *ws)
1041 struct obd_export *export;
1043 export = container_of(ws, struct obd_export, exp_zombie_work);
1044 class_export_destroy(export);
1047 /* Creates a new export, adds it to the hash table, and returns a
1048 * pointer to it. The refcount is 2: one for the hash reference, and
1049 * one for the pointer returned by this function. */
1050 struct obd_export *__class_new_export(struct obd_device *obd,
1051 struct obd_uuid *cluuid, bool is_self)
1053 struct obd_export *export;
1054 struct cfs_hash *hash = NULL;
1058 OBD_ALLOC_PTR(export);
1060 return ERR_PTR(-ENOMEM);
1062 export->exp_conn_cnt = 0;
1063 export->exp_lock_hash = NULL;
1064 export->exp_flock_hash = NULL;
1065 /* 2 = class_handle_hash + last */
1066 atomic_set(&export->exp_refcount, 2);
1067 atomic_set(&export->exp_rpc_count, 0);
1068 atomic_set(&export->exp_cb_count, 0);
1069 atomic_set(&export->exp_locks_count, 0);
1070 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1071 INIT_LIST_HEAD(&export->exp_locks_list);
1072 spin_lock_init(&export->exp_locks_list_guard);
1074 atomic_set(&export->exp_replay_count, 0);
1075 export->exp_obd = obd;
1076 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1077 spin_lock_init(&export->exp_uncommitted_replies_lock);
1078 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1079 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1080 INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1081 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1082 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1083 class_handle_hash(&export->exp_handle, &export_handle_ops);
1084 export->exp_last_request_time = ktime_get_real_seconds();
1085 spin_lock_init(&export->exp_lock);
1086 spin_lock_init(&export->exp_rpc_lock);
1087 INIT_HLIST_NODE(&export->exp_uuid_hash);
1088 INIT_HLIST_NODE(&export->exp_nid_hash);
1089 INIT_HLIST_NODE(&export->exp_gen_hash);
1090 spin_lock_init(&export->exp_bl_list_lock);
1091 INIT_LIST_HEAD(&export->exp_bl_list);
1092 INIT_LIST_HEAD(&export->exp_stale_list);
1093 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1095 export->exp_sp_peer = LUSTRE_SP_ANY;
1096 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1097 export->exp_client_uuid = *cluuid;
1098 obd_init_export(export);
1100 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1101 spin_lock(&obd->obd_dev_lock);
1102 /* shouldn't happen, but might race */
1103 if (obd->obd_stopping)
1104 GOTO(exit_unlock, rc = -ENODEV);
1106 hash = cfs_hash_getref(obd->obd_uuid_hash);
1108 GOTO(exit_unlock, rc = -ENODEV);
1109 spin_unlock(&obd->obd_dev_lock);
1111 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1113 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1114 obd->obd_name, cluuid->uuid, rc);
1115 GOTO(exit_err, rc = -EALREADY);
1119 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1120 spin_lock(&obd->obd_dev_lock);
1121 if (obd->obd_stopping) {
1123 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1124 GOTO(exit_unlock, rc = -ESHUTDOWN);
1128 class_incref(obd, "export", export);
1129 list_add_tail(&export->exp_obd_chain_timed,
1130 &obd->obd_exports_timed);
1131 list_add(&export->exp_obd_chain, &obd->obd_exports);
1132 obd->obd_num_exports++;
1134 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1135 INIT_LIST_HEAD(&export->exp_obd_chain);
1137 spin_unlock(&obd->obd_dev_lock);
1139 cfs_hash_putref(hash);
1143 spin_unlock(&obd->obd_dev_lock);
1146 cfs_hash_putref(hash);
1147 class_handle_unhash(&export->exp_handle);
1148 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1149 obd_destroy_export(export);
1150 OBD_FREE_PTR(export);
1154 struct obd_export *class_new_export(struct obd_device *obd,
1155 struct obd_uuid *uuid)
1157 return __class_new_export(obd, uuid, false);
1159 EXPORT_SYMBOL(class_new_export);
1161 struct obd_export *class_new_export_self(struct obd_device *obd,
1162 struct obd_uuid *uuid)
1164 return __class_new_export(obd, uuid, true);
1167 void class_unlink_export(struct obd_export *exp)
1169 class_handle_unhash(&exp->exp_handle);
1171 if (exp->exp_obd->obd_self_export == exp) {
1172 class_export_put(exp);
1176 spin_lock(&exp->exp_obd->obd_dev_lock);
1177 /* delete an uuid-export hashitem from hashtables */
1178 if (!hlist_unhashed(&exp->exp_uuid_hash))
1179 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1180 &exp->exp_client_uuid,
1181 &exp->exp_uuid_hash);
1183 #ifdef HAVE_SERVER_SUPPORT
1184 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1185 struct tg_export_data *ted = &exp->exp_target_data;
1186 struct cfs_hash *hash;
1188 /* Because obd_gen_hash will not be released until
1189 * class_cleanup(), so hash should never be NULL here */
1190 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1191 LASSERT(hash != NULL);
1192 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1193 &exp->exp_gen_hash);
1194 cfs_hash_putref(hash);
1196 #endif /* HAVE_SERVER_SUPPORT */
1198 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1199 list_del_init(&exp->exp_obd_chain_timed);
1200 exp->exp_obd->obd_num_exports--;
1201 spin_unlock(&exp->exp_obd->obd_dev_lock);
1202 atomic_inc(&obd_stale_export_num);
1204 /* A reference is kept by obd_stale_exports list */
1205 obd_stale_export_put(exp);
1207 EXPORT_SYMBOL(class_unlink_export);
1209 /* Import management functions */
1210 static void obd_zombie_import_free(struct obd_import *imp)
1214 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1215 imp->imp_obd->obd_name);
1217 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1219 ptlrpc_put_connection_superhack(imp->imp_connection);
1221 while (!list_empty(&imp->imp_conn_list)) {
1222 struct obd_import_conn *imp_conn;
1224 imp_conn = list_entry(imp->imp_conn_list.next,
1225 struct obd_import_conn, oic_item);
1226 list_del_init(&imp_conn->oic_item);
1227 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1228 OBD_FREE(imp_conn, sizeof(*imp_conn));
1231 LASSERT(imp->imp_sec == NULL);
1232 class_decref(imp->imp_obd, "import", imp);
1237 struct obd_import *class_import_get(struct obd_import *import)
1239 atomic_inc(&import->imp_refcount);
1240 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1241 atomic_read(&import->imp_refcount),
1242 import->imp_obd->obd_name);
1245 EXPORT_SYMBOL(class_import_get);
1247 void class_import_put(struct obd_import *imp)
1251 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1253 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1254 atomic_read(&imp->imp_refcount) - 1,
1255 imp->imp_obd->obd_name);
1257 if (atomic_dec_and_test(&imp->imp_refcount)) {
1258 CDEBUG(D_INFO, "final put import %p\n", imp);
1259 obd_zombie_import_add(imp);
1262 /* catch possible import put race */
1263 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1266 EXPORT_SYMBOL(class_import_put);
1268 static void init_imp_at(struct imp_at *at) {
1270 at_init(&at->iat_net_latency, 0, 0);
1271 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1272 /* max service estimates are tracked on the server side, so
1273 don't use the AT history here, just use the last reported
1274 val. (But keep hist for proc histogram, worst_ever) */
1275 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1280 static void obd_zombie_imp_cull(struct work_struct *ws)
1282 struct obd_import *import;
1284 import = container_of(ws, struct obd_import, imp_zombie_work);
1285 obd_zombie_import_free(import);
1288 struct obd_import *class_new_import(struct obd_device *obd)
1290 struct obd_import *imp;
1291 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1293 OBD_ALLOC(imp, sizeof(*imp));
1297 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1298 INIT_LIST_HEAD(&imp->imp_replay_list);
1299 INIT_LIST_HEAD(&imp->imp_sending_list);
1300 INIT_LIST_HEAD(&imp->imp_delayed_list);
1301 INIT_LIST_HEAD(&imp->imp_committed_list);
1302 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1303 imp->imp_known_replied_xid = 0;
1304 imp->imp_replay_cursor = &imp->imp_committed_list;
1305 spin_lock_init(&imp->imp_lock);
1306 imp->imp_last_success_conn = 0;
1307 imp->imp_state = LUSTRE_IMP_NEW;
1308 imp->imp_obd = class_incref(obd, "import", imp);
1309 rwlock_init(&imp->imp_sec_lock);
1310 init_waitqueue_head(&imp->imp_recovery_waitq);
1311 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1313 if (curr_pid_ns->child_reaper)
1314 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1316 imp->imp_sec_refpid = 1;
1318 atomic_set(&imp->imp_refcount, 2);
1319 atomic_set(&imp->imp_unregistering, 0);
1320 atomic_set(&imp->imp_inflight, 0);
1321 atomic_set(&imp->imp_replay_inflight, 0);
1322 atomic_set(&imp->imp_inval_count, 0);
1323 INIT_LIST_HEAD(&imp->imp_conn_list);
1324 init_imp_at(&imp->imp_at);
1326 /* the default magic is V2, will be used in connect RPC, and
1327 * then adjusted according to the flags in request/reply. */
1328 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1332 EXPORT_SYMBOL(class_new_import);
1334 void class_destroy_import(struct obd_import *import)
1336 LASSERT(import != NULL);
1337 LASSERT(import != LP_POISON);
1339 spin_lock(&import->imp_lock);
1340 import->imp_generation++;
1341 spin_unlock(&import->imp_lock);
1342 class_import_put(import);
1344 EXPORT_SYMBOL(class_destroy_import);
1346 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1348 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1350 spin_lock(&exp->exp_locks_list_guard);
1352 LASSERT(lock->l_exp_refs_nr >= 0);
1354 if (lock->l_exp_refs_target != NULL &&
1355 lock->l_exp_refs_target != exp) {
1356 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1357 exp, lock, lock->l_exp_refs_target);
1359 if ((lock->l_exp_refs_nr ++) == 0) {
1360 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1361 lock->l_exp_refs_target = exp;
1363 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1364 lock, exp, lock->l_exp_refs_nr);
1365 spin_unlock(&exp->exp_locks_list_guard);
1367 EXPORT_SYMBOL(__class_export_add_lock_ref);
1369 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1371 spin_lock(&exp->exp_locks_list_guard);
1372 LASSERT(lock->l_exp_refs_nr > 0);
1373 if (lock->l_exp_refs_target != exp) {
1374 LCONSOLE_WARN("lock %p, "
1375 "mismatching export pointers: %p, %p\n",
1376 lock, lock->l_exp_refs_target, exp);
1378 if (-- lock->l_exp_refs_nr == 0) {
1379 list_del_init(&lock->l_exp_refs_link);
1380 lock->l_exp_refs_target = NULL;
1382 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1383 lock, exp, lock->l_exp_refs_nr);
1384 spin_unlock(&exp->exp_locks_list_guard);
1386 EXPORT_SYMBOL(__class_export_del_lock_ref);
1389 /* A connection defines an export context in which preallocation can
1390 be managed. This releases the export pointer reference, and returns
1391 the export handle, so the export refcount is 1 when this function
1393 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1394 struct obd_uuid *cluuid)
1396 struct obd_export *export;
1397 LASSERT(conn != NULL);
1398 LASSERT(obd != NULL);
1399 LASSERT(cluuid != NULL);
1402 export = class_new_export(obd, cluuid);
1404 RETURN(PTR_ERR(export));
1406 conn->cookie = export->exp_handle.h_cookie;
1407 class_export_put(export);
1409 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1410 cluuid->uuid, conn->cookie);
1413 EXPORT_SYMBOL(class_connect);
1415 /* if export is involved in recovery then clean up related things */
1416 static void class_export_recovery_cleanup(struct obd_export *exp)
1418 struct obd_device *obd = exp->exp_obd;
1420 spin_lock(&obd->obd_recovery_task_lock);
1421 if (obd->obd_recovering) {
1422 if (exp->exp_in_recovery) {
1423 spin_lock(&exp->exp_lock);
1424 exp->exp_in_recovery = 0;
1425 spin_unlock(&exp->exp_lock);
1426 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1427 atomic_dec(&obd->obd_connected_clients);
1430 /* if called during recovery then should update
1431 * obd_stale_clients counter,
1432 * lightweight exports are not counted */
1433 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1434 exp->exp_obd->obd_stale_clients++;
1436 spin_unlock(&obd->obd_recovery_task_lock);
1438 spin_lock(&exp->exp_lock);
1439 /** Cleanup req replay fields */
1440 if (exp->exp_req_replay_needed) {
1441 exp->exp_req_replay_needed = 0;
1443 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1444 atomic_dec(&obd->obd_req_replay_clients);
1447 /** Cleanup lock replay data */
1448 if (exp->exp_lock_replay_needed) {
1449 exp->exp_lock_replay_needed = 0;
1451 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1452 atomic_dec(&obd->obd_lock_replay_clients);
1454 spin_unlock(&exp->exp_lock);
1457 /* This function removes 1-3 references from the export:
1458 * 1 - for export pointer passed
1459 * and if disconnect really need
1460 * 2 - removing from hash
1461 * 3 - in client_unlink_export
1462 * The export pointer passed to this function can destroyed */
1463 int class_disconnect(struct obd_export *export)
1465 int already_disconnected;
1468 if (export == NULL) {
1469 CWARN("attempting to free NULL export %p\n", export);
1473 spin_lock(&export->exp_lock);
1474 already_disconnected = export->exp_disconnected;
1475 export->exp_disconnected = 1;
1476 /* We hold references of export for uuid hash
1477 * and nid_hash and export link at least. So
1478 * it is safe to call cfs_hash_del in there. */
1479 if (!hlist_unhashed(&export->exp_nid_hash))
1480 cfs_hash_del(export->exp_obd->obd_nid_hash,
1481 &export->exp_connection->c_peer.nid,
1482 &export->exp_nid_hash);
1483 spin_unlock(&export->exp_lock);
1485 /* class_cleanup(), abort_recovery(), and class_fail_export()
1486 * all end up in here, and if any of them race we shouldn't
1487 * call extra class_export_puts(). */
1488 if (already_disconnected) {
1489 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1490 GOTO(no_disconn, already_disconnected);
1493 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1494 export->exp_handle.h_cookie);
1496 class_export_recovery_cleanup(export);
1497 class_unlink_export(export);
1499 class_export_put(export);
1502 EXPORT_SYMBOL(class_disconnect);
1504 /* Return non-zero for a fully connected export */
1505 int class_connected_export(struct obd_export *exp)
1510 spin_lock(&exp->exp_lock);
1511 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1512 spin_unlock(&exp->exp_lock);
1516 EXPORT_SYMBOL(class_connected_export);
1518 static void class_disconnect_export_list(struct list_head *list,
1519 enum obd_option flags)
1522 struct obd_export *exp;
1525 /* It's possible that an export may disconnect itself, but
1526 * nothing else will be added to this list. */
1527 while (!list_empty(list)) {
1528 exp = list_entry(list->next, struct obd_export,
1530 /* need for safe call CDEBUG after obd_disconnect */
1531 class_export_get(exp);
1533 spin_lock(&exp->exp_lock);
1534 exp->exp_flags = flags;
1535 spin_unlock(&exp->exp_lock);
1537 if (obd_uuid_equals(&exp->exp_client_uuid,
1538 &exp->exp_obd->obd_uuid)) {
1540 "exp %p export uuid == obd uuid, don't discon\n",
1542 /* Need to delete this now so we don't end up pointing
1543 * to work_list later when this export is cleaned up. */
1544 list_del_init(&exp->exp_obd_chain);
1545 class_export_put(exp);
1549 class_export_get(exp);
1550 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1551 "last request at %lld\n",
1552 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1553 exp, exp->exp_last_request_time);
1554 /* release one export reference anyway */
1555 rc = obd_disconnect(exp);
1557 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1558 obd_export_nid2str(exp), exp, rc);
1559 class_export_put(exp);
1564 void class_disconnect_exports(struct obd_device *obd)
1566 struct list_head work_list;
1569 /* Move all of the exports from obd_exports to a work list, en masse. */
1570 INIT_LIST_HEAD(&work_list);
1571 spin_lock(&obd->obd_dev_lock);
1572 list_splice_init(&obd->obd_exports, &work_list);
1573 list_splice_init(&obd->obd_delayed_exports, &work_list);
1574 spin_unlock(&obd->obd_dev_lock);
1576 if (!list_empty(&work_list)) {
1577 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1578 "disconnecting them\n", obd->obd_minor, obd);
1579 class_disconnect_export_list(&work_list,
1580 exp_flags_from_obd(obd));
1582 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1583 obd->obd_minor, obd);
1586 EXPORT_SYMBOL(class_disconnect_exports);
1588 /* Remove exports that have not completed recovery.
1590 void class_disconnect_stale_exports(struct obd_device *obd,
1591 int (*test_export)(struct obd_export *))
1593 struct list_head work_list;
1594 struct obd_export *exp, *n;
1598 INIT_LIST_HEAD(&work_list);
1599 spin_lock(&obd->obd_dev_lock);
1600 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1602 /* don't count self-export as client */
1603 if (obd_uuid_equals(&exp->exp_client_uuid,
1604 &exp->exp_obd->obd_uuid))
1607 /* don't evict clients which have no slot in last_rcvd
1608 * (e.g. lightweight connection) */
1609 if (exp->exp_target_data.ted_lr_idx == -1)
1612 spin_lock(&exp->exp_lock);
1613 if (exp->exp_failed || test_export(exp)) {
1614 spin_unlock(&exp->exp_lock);
1617 exp->exp_failed = 1;
1618 spin_unlock(&exp->exp_lock);
1620 list_move(&exp->exp_obd_chain, &work_list);
1622 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1623 obd->obd_name, exp->exp_client_uuid.uuid,
1624 obd_export_nid2str(exp));
1625 print_export_data(exp, "EVICTING", 0, D_HA);
1627 spin_unlock(&obd->obd_dev_lock);
1630 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1631 obd->obd_name, evicted);
1633 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1634 OBD_OPT_ABORT_RECOV);
1637 EXPORT_SYMBOL(class_disconnect_stale_exports);
1639 void class_fail_export(struct obd_export *exp)
1641 int rc, already_failed;
1643 spin_lock(&exp->exp_lock);
1644 already_failed = exp->exp_failed;
1645 exp->exp_failed = 1;
1646 spin_unlock(&exp->exp_lock);
1648 if (already_failed) {
1649 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1650 exp, exp->exp_client_uuid.uuid);
1654 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1655 exp, exp->exp_client_uuid.uuid);
1657 if (obd_dump_on_timeout)
1658 libcfs_debug_dumplog();
1660 /* need for safe call CDEBUG after obd_disconnect */
1661 class_export_get(exp);
1663 /* Most callers into obd_disconnect are removing their own reference
1664 * (request, for example) in addition to the one from the hash table.
1665 * We don't have such a reference here, so make one. */
1666 class_export_get(exp);
1667 rc = obd_disconnect(exp);
1669 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1671 CDEBUG(D_HA, "disconnected export %p/%s\n",
1672 exp, exp->exp_client_uuid.uuid);
1673 class_export_put(exp);
1675 EXPORT_SYMBOL(class_fail_export);
1677 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1679 struct cfs_hash *nid_hash;
1680 struct obd_export *doomed_exp = NULL;
1681 int exports_evicted = 0;
1683 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1685 spin_lock(&obd->obd_dev_lock);
1686 /* umount has run already, so evict thread should leave
1687 * its task to umount thread now */
1688 if (obd->obd_stopping) {
1689 spin_unlock(&obd->obd_dev_lock);
1690 return exports_evicted;
1692 nid_hash = obd->obd_nid_hash;
1693 cfs_hash_getref(nid_hash);
1694 spin_unlock(&obd->obd_dev_lock);
1697 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1698 if (doomed_exp == NULL)
1701 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1702 "nid %s found, wanted nid %s, requested nid %s\n",
1703 obd_export_nid2str(doomed_exp),
1704 libcfs_nid2str(nid_key), nid);
1705 LASSERTF(doomed_exp != obd->obd_self_export,
1706 "self-export is hashed by NID?\n");
1708 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1709 "request\n", obd->obd_name,
1710 obd_uuid2str(&doomed_exp->exp_client_uuid),
1711 obd_export_nid2str(doomed_exp));
1712 class_fail_export(doomed_exp);
1713 class_export_put(doomed_exp);
1716 cfs_hash_putref(nid_hash);
1718 if (!exports_evicted)
1719 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1720 obd->obd_name, nid);
1721 return exports_evicted;
1723 EXPORT_SYMBOL(obd_export_evict_by_nid);
1725 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1727 struct cfs_hash *uuid_hash;
1728 struct obd_export *doomed_exp = NULL;
1729 struct obd_uuid doomed_uuid;
1730 int exports_evicted = 0;
1732 spin_lock(&obd->obd_dev_lock);
1733 if (obd->obd_stopping) {
1734 spin_unlock(&obd->obd_dev_lock);
1735 return exports_evicted;
1737 uuid_hash = obd->obd_uuid_hash;
1738 cfs_hash_getref(uuid_hash);
1739 spin_unlock(&obd->obd_dev_lock);
1741 obd_str2uuid(&doomed_uuid, uuid);
1742 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1743 CERROR("%s: can't evict myself\n", obd->obd_name);
1744 cfs_hash_putref(uuid_hash);
1745 return exports_evicted;
1748 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1750 if (doomed_exp == NULL) {
1751 CERROR("%s: can't disconnect %s: no exports found\n",
1752 obd->obd_name, uuid);
1754 CWARN("%s: evicting %s at adminstrative request\n",
1755 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1756 class_fail_export(doomed_exp);
1757 class_export_put(doomed_exp);
1760 cfs_hash_putref(uuid_hash);
1762 return exports_evicted;
1765 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1766 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1767 EXPORT_SYMBOL(class_export_dump_hook);
1770 static void print_export_data(struct obd_export *exp, const char *status,
1771 int locks, int debug_level)
1773 struct ptlrpc_reply_state *rs;
1774 struct ptlrpc_reply_state *first_reply = NULL;
1777 spin_lock(&exp->exp_lock);
1778 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1784 spin_unlock(&exp->exp_lock);
1786 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1787 "%p %s %llu stale:%d\n",
1788 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1789 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1790 atomic_read(&exp->exp_rpc_count),
1791 atomic_read(&exp->exp_cb_count),
1792 atomic_read(&exp->exp_locks_count),
1793 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1794 nreplies, first_reply, nreplies > 3 ? "..." : "",
1795 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1796 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1797 if (locks && class_export_dump_hook != NULL)
1798 class_export_dump_hook(exp);
1802 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1804 struct obd_export *exp;
1806 spin_lock(&obd->obd_dev_lock);
1807 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1808 print_export_data(exp, "ACTIVE", locks, debug_level);
1809 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1810 print_export_data(exp, "UNLINKED", locks, debug_level);
1811 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1812 print_export_data(exp, "DELAYED", locks, debug_level);
1813 spin_unlock(&obd->obd_dev_lock);
1816 void obd_exports_barrier(struct obd_device *obd)
1819 LASSERT(list_empty(&obd->obd_exports));
1820 spin_lock(&obd->obd_dev_lock);
1821 while (!list_empty(&obd->obd_unlinked_exports)) {
1822 spin_unlock(&obd->obd_dev_lock);
1823 set_current_state(TASK_UNINTERRUPTIBLE);
1824 schedule_timeout(cfs_time_seconds(waited));
1825 if (waited > 5 && is_power_of_2(waited)) {
1826 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1827 "more than %d seconds. "
1828 "The obd refcount = %d. Is it stuck?\n",
1829 obd->obd_name, waited,
1830 atomic_read(&obd->obd_refcount));
1831 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1834 spin_lock(&obd->obd_dev_lock);
1836 spin_unlock(&obd->obd_dev_lock);
1838 EXPORT_SYMBOL(obd_exports_barrier);
1841 * Add export to the obd_zombe thread and notify it.
1843 static void obd_zombie_export_add(struct obd_export *exp) {
1844 atomic_dec(&obd_stale_export_num);
1845 spin_lock(&exp->exp_obd->obd_dev_lock);
1846 LASSERT(!list_empty(&exp->exp_obd_chain));
1847 list_del_init(&exp->exp_obd_chain);
1848 spin_unlock(&exp->exp_obd->obd_dev_lock);
1850 queue_work(zombie_wq, &exp->exp_zombie_work);
1854 * Add import to the obd_zombe thread and notify it.
1856 static void obd_zombie_import_add(struct obd_import *imp) {
1857 LASSERT(imp->imp_sec == NULL);
1859 queue_work(zombie_wq, &imp->imp_zombie_work);
1863 * wait when obd_zombie import/export queues become empty
1865 void obd_zombie_barrier(void)
1867 flush_workqueue(zombie_wq);
1869 EXPORT_SYMBOL(obd_zombie_barrier);
1872 struct obd_export *obd_stale_export_get(void)
1874 struct obd_export *exp = NULL;
1877 spin_lock(&obd_stale_export_lock);
1878 if (!list_empty(&obd_stale_exports)) {
1879 exp = list_entry(obd_stale_exports.next,
1880 struct obd_export, exp_stale_list);
1881 list_del_init(&exp->exp_stale_list);
1883 spin_unlock(&obd_stale_export_lock);
1886 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1887 atomic_read(&obd_stale_export_num));
1891 EXPORT_SYMBOL(obd_stale_export_get);
1893 void obd_stale_export_put(struct obd_export *exp)
1897 LASSERT(list_empty(&exp->exp_stale_list));
1898 if (exp->exp_lock_hash &&
1899 atomic_read(&exp->exp_lock_hash->hs_count)) {
1900 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1901 atomic_read(&obd_stale_export_num));
1903 spin_lock_bh(&exp->exp_bl_list_lock);
1904 spin_lock(&obd_stale_export_lock);
1905 /* Add to the tail if there is no blocked locks,
1906 * to the head otherwise. */
1907 if (list_empty(&exp->exp_bl_list))
1908 list_add_tail(&exp->exp_stale_list,
1909 &obd_stale_exports);
1911 list_add(&exp->exp_stale_list,
1912 &obd_stale_exports);
1914 spin_unlock(&obd_stale_export_lock);
1915 spin_unlock_bh(&exp->exp_bl_list_lock);
1917 class_export_put(exp);
1921 EXPORT_SYMBOL(obd_stale_export_put);
1924 * Adjust the position of the export in the stale list,
1925 * i.e. move to the head of the list if is needed.
1927 void obd_stale_export_adjust(struct obd_export *exp)
1929 LASSERT(exp != NULL);
1930 spin_lock_bh(&exp->exp_bl_list_lock);
1931 spin_lock(&obd_stale_export_lock);
1933 if (!list_empty(&exp->exp_stale_list) &&
1934 !list_empty(&exp->exp_bl_list))
1935 list_move(&exp->exp_stale_list, &obd_stale_exports);
1937 spin_unlock(&obd_stale_export_lock);
1938 spin_unlock_bh(&exp->exp_bl_list_lock);
1940 EXPORT_SYMBOL(obd_stale_export_adjust);
1943 * start destroy zombie import/export thread
1945 int obd_zombie_impexp_init(void)
1947 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1955 * stop destroy zombie import/export thread
1957 void obd_zombie_impexp_stop(void)
1959 destroy_workqueue(zombie_wq);
1960 LASSERT(list_empty(&obd_stale_exports));
1963 /***** Kernel-userspace comm helpers *******/
1965 /* Get length of entire message, including header */
1966 int kuc_len(int payload_len)
1968 return sizeof(struct kuc_hdr) + payload_len;
1970 EXPORT_SYMBOL(kuc_len);
1972 /* Get a pointer to kuc header, given a ptr to the payload
1973 * @param p Pointer to payload area
1974 * @returns Pointer to kuc header
1976 struct kuc_hdr * kuc_ptr(void *p)
1978 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1979 LASSERT(lh->kuc_magic == KUC_MAGIC);
1982 EXPORT_SYMBOL(kuc_ptr);
1984 /* Alloc space for a message, and fill in header
1985 * @return Pointer to payload area
1987 void *kuc_alloc(int payload_len, int transport, int type)
1990 int len = kuc_len(payload_len);
1994 return ERR_PTR(-ENOMEM);
1996 lh->kuc_magic = KUC_MAGIC;
1997 lh->kuc_transport = transport;
1998 lh->kuc_msgtype = type;
1999 lh->kuc_msglen = len;
2001 return (void *)(lh + 1);
2003 EXPORT_SYMBOL(kuc_alloc);
2005 /* Takes pointer to payload area */
2006 void kuc_free(void *p, int payload_len)
2008 struct kuc_hdr *lh = kuc_ptr(p);
2009 OBD_FREE(lh, kuc_len(payload_len));
2011 EXPORT_SYMBOL(kuc_free);
2013 struct obd_request_slot_waiter {
2014 struct list_head orsw_entry;
2015 wait_queue_head_t orsw_waitq;
2019 static bool obd_request_slot_avail(struct client_obd *cli,
2020 struct obd_request_slot_waiter *orsw)
2024 spin_lock(&cli->cl_loi_list_lock);
2025 avail = !!list_empty(&orsw->orsw_entry);
2026 spin_unlock(&cli->cl_loi_list_lock);
2032 * For network flow control, the RPC sponsor needs to acquire a credit
2033 * before sending the RPC. The credits count for a connection is defined
2034 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2035 * the subsequent RPC sponsors need to wait until others released their
2036 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2038 int obd_get_request_slot(struct client_obd *cli)
2040 struct obd_request_slot_waiter orsw;
2041 struct l_wait_info lwi;
2044 spin_lock(&cli->cl_loi_list_lock);
2045 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2046 cli->cl_rpcs_in_flight++;
2047 spin_unlock(&cli->cl_loi_list_lock);
2051 init_waitqueue_head(&orsw.orsw_waitq);
2052 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2053 orsw.orsw_signaled = false;
2054 spin_unlock(&cli->cl_loi_list_lock);
2056 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2057 rc = l_wait_event(orsw.orsw_waitq,
2058 obd_request_slot_avail(cli, &orsw) ||
2062 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2063 * freed but other (such as obd_put_request_slot) is using it. */
2064 spin_lock(&cli->cl_loi_list_lock);
2066 if (!orsw.orsw_signaled) {
2067 if (list_empty(&orsw.orsw_entry))
2068 cli->cl_rpcs_in_flight--;
2070 list_del(&orsw.orsw_entry);
2074 if (orsw.orsw_signaled) {
2075 LASSERT(list_empty(&orsw.orsw_entry));
2079 spin_unlock(&cli->cl_loi_list_lock);
2083 EXPORT_SYMBOL(obd_get_request_slot);
2085 void obd_put_request_slot(struct client_obd *cli)
2087 struct obd_request_slot_waiter *orsw;
2089 spin_lock(&cli->cl_loi_list_lock);
2090 cli->cl_rpcs_in_flight--;
2092 /* If there is free slot, wakeup the first waiter. */
2093 if (!list_empty(&cli->cl_flight_waiters) &&
2094 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2095 orsw = list_entry(cli->cl_flight_waiters.next,
2096 struct obd_request_slot_waiter, orsw_entry);
2097 list_del_init(&orsw->orsw_entry);
2098 cli->cl_rpcs_in_flight++;
2099 wake_up(&orsw->orsw_waitq);
2101 spin_unlock(&cli->cl_loi_list_lock);
2103 EXPORT_SYMBOL(obd_put_request_slot);
2105 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2107 return cli->cl_max_rpcs_in_flight;
2109 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2111 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2113 struct obd_request_slot_waiter *orsw;
2117 const char *type_name;
2120 if (max > OBD_MAX_RIF_MAX || max < 1)
2123 type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2124 if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2125 /* adjust max_mod_rpcs_in_flight to ensure it is always
2126 * strictly lower that max_rpcs_in_flight */
2128 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2129 "because it must be higher than "
2130 "max_mod_rpcs_in_flight value",
2131 cli->cl_import->imp_obd->obd_name);
2134 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2135 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2141 spin_lock(&cli->cl_loi_list_lock);
2142 old = cli->cl_max_rpcs_in_flight;
2143 cli->cl_max_rpcs_in_flight = max;
2144 client_adjust_max_dirty(cli);
2148 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2149 for (i = 0; i < diff; i++) {
2150 if (list_empty(&cli->cl_flight_waiters))
2153 orsw = list_entry(cli->cl_flight_waiters.next,
2154 struct obd_request_slot_waiter, orsw_entry);
2155 list_del_init(&orsw->orsw_entry);
2156 cli->cl_rpcs_in_flight++;
2157 wake_up(&orsw->orsw_waitq);
2159 spin_unlock(&cli->cl_loi_list_lock);
2163 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2165 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2167 return cli->cl_max_mod_rpcs_in_flight;
2169 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2171 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2173 struct obd_connect_data *ocd;
2177 if (max > OBD_MAX_RIF_MAX || max < 1)
2180 /* cannot exceed or equal max_rpcs_in_flight */
2181 if (max >= cli->cl_max_rpcs_in_flight) {
2182 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2183 "higher or equal to max_rpcs_in_flight value (%u)\n",
2184 cli->cl_import->imp_obd->obd_name,
2185 max, cli->cl_max_rpcs_in_flight);
2189 /* cannot exceed max modify RPCs in flight supported by the server */
2190 ocd = &cli->cl_import->imp_connect_data;
2191 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2192 maxmodrpcs = ocd->ocd_maxmodrpcs;
2195 if (max > maxmodrpcs) {
2196 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2197 "higher than max_mod_rpcs_per_client value (%hu) "
2198 "returned by the server at connection\n",
2199 cli->cl_import->imp_obd->obd_name,
2204 spin_lock(&cli->cl_mod_rpcs_lock);
2206 prev = cli->cl_max_mod_rpcs_in_flight;
2207 cli->cl_max_mod_rpcs_in_flight = max;
2209 /* wakeup waiters if limit has been increased */
2210 if (cli->cl_max_mod_rpcs_in_flight > prev)
2211 wake_up(&cli->cl_mod_rpcs_waitq);
2213 spin_unlock(&cli->cl_mod_rpcs_lock);
2217 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2219 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2220 struct seq_file *seq)
2222 unsigned long mod_tot = 0, mod_cum;
2223 struct timespec64 now;
2226 ktime_get_real_ts64(&now);
2228 spin_lock(&cli->cl_mod_rpcs_lock);
2230 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2231 (s64)now.tv_sec, now.tv_nsec);
2232 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2233 cli->cl_mod_rpcs_in_flight);
2235 seq_printf(seq, "\n\t\t\tmodify\n");
2236 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2238 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2241 for (i = 0; i < OBD_HIST_MAX; i++) {
2242 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2244 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2245 i, mod, pct(mod, mod_tot),
2246 pct(mod_cum, mod_tot));
2247 if (mod_cum == mod_tot)
2251 spin_unlock(&cli->cl_mod_rpcs_lock);
2255 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2257 /* The number of modify RPCs sent in parallel is limited
2258 * because the server has a finite number of slots per client to
2259 * store request result and ensure reply reconstruction when needed.
2260 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2261 * that takes into account server limit and cl_max_rpcs_in_flight
2263 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2264 * one close request is allowed above the maximum.
2266 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2271 /* A slot is available if
2272 * - number of modify RPCs in flight is less than the max
2273 * - it's a close RPC and no other close request is in flight
2275 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2276 (close_req && cli->cl_close_rpcs_in_flight == 0);
2281 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2286 spin_lock(&cli->cl_mod_rpcs_lock);
2287 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2288 spin_unlock(&cli->cl_mod_rpcs_lock);
2292 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2295 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2296 it->it_op == IT_READDIR ||
2297 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2302 /* Get a modify RPC slot from the obd client @cli according
2303 * to the kind of operation @opc that is going to be sent
2304 * and the intent @it of the operation if it applies.
2305 * If the maximum number of modify RPCs in flight is reached
2306 * the thread is put to sleep.
2307 * Returns the tag to be set in the request message. Tag 0
2308 * is reserved for non-modifying requests.
2310 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2311 struct lookup_intent *it)
2313 bool close_req = false;
2316 /* read-only metadata RPCs don't consume a slot on MDT
2317 * for reply reconstruction
2319 if (obd_skip_mod_rpc_slot(it))
2322 if (opc == MDS_CLOSE)
2326 spin_lock(&cli->cl_mod_rpcs_lock);
2327 max = cli->cl_max_mod_rpcs_in_flight;
2328 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2329 /* there is a slot available */
2330 cli->cl_mod_rpcs_in_flight++;
2332 cli->cl_close_rpcs_in_flight++;
2333 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2334 cli->cl_mod_rpcs_in_flight);
2335 /* find a free tag */
2336 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2338 LASSERT(i < OBD_MAX_RIF_MAX);
2339 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2340 spin_unlock(&cli->cl_mod_rpcs_lock);
2341 /* tag 0 is reserved for non-modify RPCs */
2343 CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
2344 "opc %u, max %hu\n",
2345 cli->cl_import->imp_obd->obd_name,
2350 spin_unlock(&cli->cl_mod_rpcs_lock);
2352 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2353 "opc %u, max %hu\n",
2354 cli->cl_import->imp_obd->obd_name, opc, max);
2356 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2357 obd_mod_rpc_slot_avail(cli,
2361 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2363 /* Put a modify RPC slot from the obd client @cli according
2364 * to the kind of operation @opc that has been sent and the
2365 * intent @it of the operation if it applies.
2367 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2368 struct lookup_intent *it, __u16 tag)
2370 bool close_req = false;
2372 if (obd_skip_mod_rpc_slot(it))
2375 if (opc == MDS_CLOSE)
2378 spin_lock(&cli->cl_mod_rpcs_lock);
2379 cli->cl_mod_rpcs_in_flight--;
2381 cli->cl_close_rpcs_in_flight--;
2382 /* release the tag in the bitmap */
2383 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2384 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2385 spin_unlock(&cli->cl_mod_rpcs_lock);
2386 wake_up(&cli->cl_mod_rpcs_waitq);
2388 EXPORT_SYMBOL(obd_put_mod_rpc_slot);