4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59 const char *status, int locks, int debug_level);
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90 obd, obd->obd_namespace, obd->obd_force);
93 lu_ref_fini(&obd->obd_reference);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 struct obd_type *class_search_type(const char *name)
99 struct kobject *kobj = kset_find_obj(lustre_kset, name);
101 if (kobj && kobj->ktype == &class_ktype)
102 return container_of(kobj, struct obd_type, typ_kobj);
107 EXPORT_SYMBOL(class_search_type);
109 struct obd_type *class_get_type(const char *name)
111 struct obd_type *type;
113 type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
116 const char *modname = name;
118 #ifdef HAVE_SERVER_SUPPORT
119 if (strcmp(modname, "obdfilter") == 0)
122 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123 modname = LUSTRE_OSP_NAME;
125 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126 modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 if (try_module_get(type->typ_dt_ops->o_owner)) {
140 atomic_inc(&type->typ_refcnt);
141 /* class_search_type() returned a counted reference,
142 * but we don't need that count any more as
143 * we have one through typ_refcnt.
145 kobject_put(&type->typ_kobj);
147 kobject_put(&type->typ_kobj);
154 void class_put_type(struct obd_type *type)
157 module_put(type->typ_dt_ops->o_owner);
158 atomic_dec(&type->typ_refcnt);
161 static void class_sysfs_release(struct kobject *kobj)
163 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
165 debugfs_remove_recursive(type->typ_debugfs_entry);
166 type->typ_debugfs_entry = NULL;
169 lu_device_type_fini(type->typ_lu);
171 #ifdef CONFIG_PROC_FS
172 if (type->typ_name && type->typ_procroot)
173 remove_proc_subtree(type->typ_name, proc_lustre_root);
175 if (type->typ_md_ops)
176 OBD_FREE_PTR(type->typ_md_ops);
177 if (type->typ_dt_ops)
178 OBD_FREE_PTR(type->typ_dt_ops);
180 OBD_FREE(type, sizeof(*type));
183 static struct kobj_type class_ktype = {
184 .sysfs_ops = &lustre_sysfs_ops,
185 .release = class_sysfs_release,
188 #ifdef HAVE_SERVER_SUPPORT
189 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
191 struct dentry *symlink;
192 struct obd_type *type;
195 type = class_search_type(name);
197 kobject_put(&type->typ_kobj);
198 return ERR_PTR(-EEXIST);
201 OBD_ALLOC(type, sizeof(*type));
203 return ERR_PTR(-ENOMEM);
205 type->typ_kobj.kset = lustre_kset;
206 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
207 &lustre_kset->kobj, "%s", name);
211 symlink = debugfs_create_dir(name, debugfs_lustre_root);
212 if (IS_ERR_OR_NULL(symlink)) {
213 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
214 kobject_put(&type->typ_kobj);
217 type->typ_debugfs_entry = symlink;
218 type->typ_sym_filter = true;
221 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
223 if (IS_ERR(type->typ_procroot)) {
224 CERROR("%s: can't create compat proc entry: %d\n",
225 name, (int)PTR_ERR(type->typ_procroot));
226 type->typ_procroot = NULL;
232 EXPORT_SYMBOL(class_add_symlinks);
233 #endif /* HAVE_SERVER_SUPPORT */
235 #define CLASS_MAX_NAME 1024
237 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
238 bool enable_proc, struct lprocfs_vars *vars,
239 const char *name, struct lu_device_type *ldt)
241 struct obd_type *type;
246 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
248 type = class_search_type(name);
250 #ifdef HAVE_SERVER_SUPPORT
251 if (type->typ_sym_filter)
253 #endif /* HAVE_SERVER_SUPPORT */
254 kobject_put(&type->typ_kobj);
255 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
259 OBD_ALLOC(type, sizeof(*type));
263 type->typ_kobj.kset = lustre_kset;
264 kobject_init(&type->typ_kobj, &class_ktype);
265 #ifdef HAVE_SERVER_SUPPORT
267 #endif /* HAVE_SERVER_SUPPORT */
268 OBD_ALLOC_PTR(type->typ_dt_ops);
269 OBD_ALLOC_PTR(type->typ_md_ops);
271 if (type->typ_dt_ops == NULL ||
272 type->typ_md_ops == NULL)
273 GOTO (failed, rc = -ENOMEM);
275 *(type->typ_dt_ops) = *dt_ops;
276 /* md_ops is optional */
278 *(type->typ_md_ops) = *md_ops;
280 #ifdef HAVE_SERVER_SUPPORT
281 if (type->typ_sym_filter) {
282 type->typ_sym_filter = false;
283 kobject_put(&type->typ_kobj);
287 #ifdef CONFIG_PROC_FS
288 if (enable_proc && !type->typ_procroot) {
289 type->typ_procroot = lprocfs_register(name,
292 if (IS_ERR(type->typ_procroot)) {
293 rc = PTR_ERR(type->typ_procroot);
294 type->typ_procroot = NULL;
299 type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
301 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
302 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
304 type->typ_debugfs_entry = NULL;
308 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
311 #ifdef HAVE_SERVER_SUPPORT
316 rc = lu_device_type_init(ldt);
324 kobject_put(&type->typ_kobj);
328 EXPORT_SYMBOL(class_register_type);
330 int class_unregister_type(const char *name)
332 struct obd_type *type = class_search_type(name);
337 CERROR("unknown obd type\n");
341 if (atomic_read(&type->typ_refcnt)) {
342 CERROR("type %s has refcount (%d)\n", name,
343 atomic_read(&type->typ_refcnt));
344 /* This is a bad situation, let's make the best of it */
345 /* Remove ops, but leave the name for debugging */
346 OBD_FREE_PTR(type->typ_dt_ops);
347 OBD_FREE_PTR(type->typ_md_ops);
348 GOTO(out_put, rc = -EBUSY);
351 /* Put the final ref */
352 kobject_put(&type->typ_kobj);
354 /* Put the ref returned by class_search_type() */
355 kobject_put(&type->typ_kobj);
358 } /* class_unregister_type */
359 EXPORT_SYMBOL(class_unregister_type);
362 * Create a new obd device.
364 * Allocate the new obd_device and initialize it.
366 * \param[in] type_name obd device type string.
367 * \param[in] name obd device name.
368 * \param[in] uuid obd device UUID
370 * \retval newdev pointer to created obd_device
371 * \retval ERR_PTR(errno) on error
373 struct obd_device *class_newdev(const char *type_name, const char *name,
376 struct obd_device *newdev;
377 struct obd_type *type = NULL;
380 if (strlen(name) >= MAX_OBD_NAME) {
381 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
382 RETURN(ERR_PTR(-EINVAL));
385 type = class_get_type(type_name);
387 CERROR("OBD: unknown type: %s\n", type_name);
388 RETURN(ERR_PTR(-ENODEV));
391 newdev = obd_device_alloc();
392 if (newdev == NULL) {
393 class_put_type(type);
394 RETURN(ERR_PTR(-ENOMEM));
396 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
397 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
398 newdev->obd_type = type;
399 newdev->obd_minor = -1;
401 rwlock_init(&newdev->obd_pool_lock);
402 newdev->obd_pool_limit = 0;
403 newdev->obd_pool_slv = 0;
405 INIT_LIST_HEAD(&newdev->obd_exports);
406 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
407 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
408 INIT_LIST_HEAD(&newdev->obd_exports_timed);
409 INIT_LIST_HEAD(&newdev->obd_nid_stats);
410 spin_lock_init(&newdev->obd_nid_lock);
411 spin_lock_init(&newdev->obd_dev_lock);
412 mutex_init(&newdev->obd_dev_mutex);
413 spin_lock_init(&newdev->obd_osfs_lock);
414 /* newdev->obd_osfs_age must be set to a value in the distant
415 * past to guarantee a fresh statfs is fetched on mount. */
416 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
418 /* XXX belongs in setup not attach */
419 init_rwsem(&newdev->obd_observer_link_sem);
421 spin_lock_init(&newdev->obd_recovery_task_lock);
422 init_waitqueue_head(&newdev->obd_next_transno_waitq);
423 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
424 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
425 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
426 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
427 INIT_LIST_HEAD(&newdev->obd_evict_list);
428 INIT_LIST_HEAD(&newdev->obd_lwp_list);
430 llog_group_init(&newdev->obd_olg);
431 /* Detach drops this */
432 atomic_set(&newdev->obd_refcount, 1);
433 lu_ref_init(&newdev->obd_reference);
434 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
436 newdev->obd_conn_inprogress = 0;
438 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
440 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
441 newdev->obd_name, newdev);
449 * \param[in] obd obd_device to be freed
453 void class_free_dev(struct obd_device *obd)
455 struct obd_type *obd_type = obd->obd_type;
457 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
458 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
459 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
460 "obd %p != obd_devs[%d] %p\n",
461 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
462 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
463 "obd_refcount should be 0, not %d\n",
464 atomic_read(&obd->obd_refcount));
465 LASSERT(obd_type != NULL);
467 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
468 obd->obd_name, obd->obd_type->typ_name);
470 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
471 obd->obd_name, obd->obd_uuid.uuid);
472 if (obd->obd_stopping) {
475 /* If we're not stopping, we were never set up */
476 err = obd_cleanup(obd);
478 CERROR("Cleanup %s returned %d\n",
482 obd_device_free(obd);
484 class_put_type(obd_type);
488 * Unregister obd device.
490 * Free slot in obd_dev[] used by \a obd.
492 * \param[in] new_obd obd_device to be unregistered
496 void class_unregister_device(struct obd_device *obd)
498 write_lock(&obd_dev_lock);
499 if (obd->obd_minor >= 0) {
500 LASSERT(obd_devs[obd->obd_minor] == obd);
501 obd_devs[obd->obd_minor] = NULL;
504 write_unlock(&obd_dev_lock);
508 * Register obd device.
510 * Find free slot in obd_devs[], fills it with \a new_obd.
512 * \param[in] new_obd obd_device to be registered
515 * \retval -EEXIST device with this name is registered
516 * \retval -EOVERFLOW obd_devs[] is full
518 int class_register_device(struct obd_device *new_obd)
522 int new_obd_minor = 0;
523 bool minor_assign = false;
524 bool retried = false;
527 write_lock(&obd_dev_lock);
528 for (i = 0; i < class_devno_max(); i++) {
529 struct obd_device *obd = class_num2obd(i);
532 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
535 write_unlock(&obd_dev_lock);
537 /* the obd_device could be waited to be
538 * destroyed by the "obd_zombie_impexp_thread".
540 obd_zombie_barrier();
545 CERROR("%s: already exists, won't add\n",
547 /* in case we found a free slot before duplicate */
548 minor_assign = false;
552 if (!minor_assign && obd == NULL) {
559 new_obd->obd_minor = new_obd_minor;
560 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
561 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
562 obd_devs[new_obd_minor] = new_obd;
566 CERROR("%s: all %u/%u devices used, increase "
567 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
568 i, class_devno_max(), ret);
571 write_unlock(&obd_dev_lock);
576 static int class_name2dev_nolock(const char *name)
583 for (i = 0; i < class_devno_max(); i++) {
584 struct obd_device *obd = class_num2obd(i);
586 if (obd && strcmp(name, obd->obd_name) == 0) {
587 /* Make sure we finished attaching before we give
588 out any references */
589 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
590 if (obd->obd_attached) {
600 int class_name2dev(const char *name)
607 read_lock(&obd_dev_lock);
608 i = class_name2dev_nolock(name);
609 read_unlock(&obd_dev_lock);
613 EXPORT_SYMBOL(class_name2dev);
615 struct obd_device *class_name2obd(const char *name)
617 int dev = class_name2dev(name);
619 if (dev < 0 || dev > class_devno_max())
621 return class_num2obd(dev);
623 EXPORT_SYMBOL(class_name2obd);
625 int class_uuid2dev_nolock(struct obd_uuid *uuid)
629 for (i = 0; i < class_devno_max(); i++) {
630 struct obd_device *obd = class_num2obd(i);
632 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
633 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
641 int class_uuid2dev(struct obd_uuid *uuid)
645 read_lock(&obd_dev_lock);
646 i = class_uuid2dev_nolock(uuid);
647 read_unlock(&obd_dev_lock);
651 EXPORT_SYMBOL(class_uuid2dev);
653 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
655 int dev = class_uuid2dev(uuid);
658 return class_num2obd(dev);
660 EXPORT_SYMBOL(class_uuid2obd);
663 * Get obd device from ::obd_devs[]
665 * \param num [in] array index
667 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
668 * otherwise return the obd device there.
670 struct obd_device *class_num2obd(int num)
672 struct obd_device *obd = NULL;
674 if (num < class_devno_max()) {
679 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
680 "%p obd_magic %08x != %08x\n",
681 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
682 LASSERTF(obd->obd_minor == num,
683 "%p obd_minor %0d != %0d\n",
684 obd, obd->obd_minor, num);
691 * Find obd in obd_dev[] by name or uuid.
693 * Increment obd's refcount if found.
695 * \param[in] str obd name or uuid
697 * \retval NULL if not found
698 * \retval target pointer to found obd_device
700 struct obd_device *class_dev_by_str(const char *str)
702 struct obd_device *target = NULL;
703 struct obd_uuid tgtuuid;
706 obd_str2uuid(&tgtuuid, str);
708 read_lock(&obd_dev_lock);
709 rc = class_uuid2dev_nolock(&tgtuuid);
711 rc = class_name2dev_nolock(str);
714 target = class_num2obd(rc);
717 class_incref(target, "find", current);
718 read_unlock(&obd_dev_lock);
722 EXPORT_SYMBOL(class_dev_by_str);
725 * Get obd devices count. Device in any
727 * \retval obd device count
729 int get_devices_count(void)
731 int index, max_index = class_devno_max(), dev_count = 0;
733 read_lock(&obd_dev_lock);
734 for (index = 0; index <= max_index; index++) {
735 struct obd_device *obd = class_num2obd(index);
739 read_unlock(&obd_dev_lock);
743 EXPORT_SYMBOL(get_devices_count);
745 void class_obd_list(void)
750 read_lock(&obd_dev_lock);
751 for (i = 0; i < class_devno_max(); i++) {
752 struct obd_device *obd = class_num2obd(i);
756 if (obd->obd_stopping)
758 else if (obd->obd_set_up)
760 else if (obd->obd_attached)
764 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
765 i, status, obd->obd_type->typ_name,
766 obd->obd_name, obd->obd_uuid.uuid,
767 atomic_read(&obd->obd_refcount));
769 read_unlock(&obd_dev_lock);
772 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
773 specified, then only the client with that uuid is returned,
774 otherwise any client connected to the tgt is returned. */
775 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
776 const char *type_name,
777 struct obd_uuid *grp_uuid)
781 read_lock(&obd_dev_lock);
782 for (i = 0; i < class_devno_max(); i++) {
783 struct obd_device *obd = class_num2obd(i);
787 if ((strncmp(obd->obd_type->typ_name, type_name,
788 strlen(type_name)) == 0)) {
789 if (obd_uuid_equals(tgt_uuid,
790 &obd->u.cli.cl_target_uuid) &&
791 ((grp_uuid)? obd_uuid_equals(grp_uuid,
792 &obd->obd_uuid) : 1)) {
793 read_unlock(&obd_dev_lock);
798 read_unlock(&obd_dev_lock);
802 EXPORT_SYMBOL(class_find_client_obd);
804 /* Iterate the obd_device list looking devices have grp_uuid. Start
805 searching at *next, and if a device is found, the next index to look
806 at is saved in *next. If next is NULL, then the first matching device
807 will always be returned. */
808 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
814 else if (*next >= 0 && *next < class_devno_max())
819 read_lock(&obd_dev_lock);
820 for (; i < class_devno_max(); i++) {
821 struct obd_device *obd = class_num2obd(i);
825 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
828 read_unlock(&obd_dev_lock);
832 read_unlock(&obd_dev_lock);
836 EXPORT_SYMBOL(class_devices_in_group);
839 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
840 * adjust sptlrpc settings accordingly.
842 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
844 struct obd_device *obd;
848 LASSERT(namelen > 0);
850 read_lock(&obd_dev_lock);
851 for (i = 0; i < class_devno_max(); i++) {
852 obd = class_num2obd(i);
854 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
857 /* only notify mdc, osc, osp, lwp, mdt, ost
858 * because only these have a -sptlrpc llog */
859 type = obd->obd_type->typ_name;
860 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
861 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
862 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
863 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
864 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
865 strcmp(type, LUSTRE_OST_NAME) != 0)
868 if (strncmp(obd->obd_name, fsname, namelen))
871 class_incref(obd, __FUNCTION__, obd);
872 read_unlock(&obd_dev_lock);
873 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
874 sizeof(KEY_SPTLRPC_CONF),
875 KEY_SPTLRPC_CONF, 0, NULL, NULL);
877 class_decref(obd, __FUNCTION__, obd);
878 read_lock(&obd_dev_lock);
880 read_unlock(&obd_dev_lock);
883 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
885 void obd_cleanup_caches(void)
888 if (obd_device_cachep) {
889 kmem_cache_destroy(obd_device_cachep);
890 obd_device_cachep = NULL;
896 int obd_init_caches(void)
901 LASSERT(obd_device_cachep == NULL);
902 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
903 sizeof(struct obd_device),
904 0, 0, 0, sizeof(struct obd_device), NULL);
905 if (!obd_device_cachep)
906 GOTO(out, rc = -ENOMEM);
910 obd_cleanup_caches();
914 static struct portals_handle_ops export_handle_ops;
916 /* map connection to client */
917 struct obd_export *class_conn2export(struct lustre_handle *conn)
919 struct obd_export *export;
923 CDEBUG(D_CACHE, "looking for null handle\n");
927 if (conn->cookie == -1) { /* this means assign a new connection */
928 CDEBUG(D_CACHE, "want a new connection\n");
932 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
933 export = class_handle2object(conn->cookie, &export_handle_ops);
936 EXPORT_SYMBOL(class_conn2export);
938 struct obd_device *class_exp2obd(struct obd_export *exp)
944 EXPORT_SYMBOL(class_exp2obd);
946 struct obd_import *class_exp2cliimp(struct obd_export *exp)
948 struct obd_device *obd = exp->exp_obd;
951 return obd->u.cli.cl_import;
953 EXPORT_SYMBOL(class_exp2cliimp);
955 /* Export management functions */
956 static void class_export_destroy(struct obd_export *exp)
958 struct obd_device *obd = exp->exp_obd;
961 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
962 LASSERT(obd != NULL);
964 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
965 exp->exp_client_uuid.uuid, obd->obd_name);
967 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
968 if (exp->exp_connection)
969 ptlrpc_put_connection_superhack(exp->exp_connection);
971 LASSERT(list_empty(&exp->exp_outstanding_replies));
972 LASSERT(list_empty(&exp->exp_uncommitted_replies));
973 LASSERT(list_empty(&exp->exp_req_replay_queue));
974 LASSERT(list_empty(&exp->exp_hp_rpcs));
975 obd_destroy_export(exp);
976 /* self export doesn't hold a reference to an obd, although it
977 * exists until freeing of the obd */
978 if (exp != obd->obd_self_export)
979 class_decref(obd, "export", exp);
981 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
985 static struct portals_handle_ops export_handle_ops = {
987 .hop_type = "export",
990 struct obd_export *class_export_get(struct obd_export *exp)
992 refcount_inc(&exp->exp_handle.h_ref);
993 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
994 refcount_read(&exp->exp_handle.h_ref));
997 EXPORT_SYMBOL(class_export_get);
999 void class_export_put(struct obd_export *exp)
1001 LASSERT(exp != NULL);
1002 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
1003 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
1004 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1005 refcount_read(&exp->exp_handle.h_ref) - 1);
1007 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
1008 struct obd_device *obd = exp->exp_obd;
1010 CDEBUG(D_IOCTL, "final put %p/%s\n",
1011 exp, exp->exp_client_uuid.uuid);
1013 /* release nid stat refererence */
1014 lprocfs_exp_cleanup(exp);
1016 if (exp == obd->obd_self_export) {
1017 /* self export should be destroyed without
1018 * zombie thread as it doesn't hold a
1019 * reference to obd and doesn't hold any
1021 class_export_destroy(exp);
1022 /* self export is destroyed, no class
1023 * references exist and it is safe to free
1025 class_free_dev(obd);
1027 LASSERT(!list_empty(&exp->exp_obd_chain));
1028 obd_zombie_export_add(exp);
1033 EXPORT_SYMBOL(class_export_put);
1035 static void obd_zombie_exp_cull(struct work_struct *ws)
1037 struct obd_export *export;
1039 export = container_of(ws, struct obd_export, exp_zombie_work);
1040 class_export_destroy(export);
1043 /* Creates a new export, adds it to the hash table, and returns a
1044 * pointer to it. The refcount is 2: one for the hash reference, and
1045 * one for the pointer returned by this function. */
1046 struct obd_export *__class_new_export(struct obd_device *obd,
1047 struct obd_uuid *cluuid, bool is_self)
1049 struct obd_export *export;
1050 struct cfs_hash *hash = NULL;
1054 OBD_ALLOC_PTR(export);
1056 return ERR_PTR(-ENOMEM);
1058 export->exp_conn_cnt = 0;
1059 export->exp_lock_hash = NULL;
1060 export->exp_flock_hash = NULL;
1061 /* 2 = class_handle_hash + last */
1062 refcount_set(&export->exp_handle.h_ref, 2);
1063 atomic_set(&export->exp_rpc_count, 0);
1064 atomic_set(&export->exp_cb_count, 0);
1065 atomic_set(&export->exp_locks_count, 0);
1066 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1067 INIT_LIST_HEAD(&export->exp_locks_list);
1068 spin_lock_init(&export->exp_locks_list_guard);
1070 atomic_set(&export->exp_replay_count, 0);
1071 export->exp_obd = obd;
1072 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1073 spin_lock_init(&export->exp_uncommitted_replies_lock);
1074 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1075 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1076 INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1077 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1078 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1079 class_handle_hash(&export->exp_handle, &export_handle_ops);
1080 export->exp_last_request_time = ktime_get_real_seconds();
1081 spin_lock_init(&export->exp_lock);
1082 spin_lock_init(&export->exp_rpc_lock);
1083 INIT_HLIST_NODE(&export->exp_uuid_hash);
1084 INIT_HLIST_NODE(&export->exp_nid_hash);
1085 INIT_HLIST_NODE(&export->exp_gen_hash);
1086 spin_lock_init(&export->exp_bl_list_lock);
1087 INIT_LIST_HEAD(&export->exp_bl_list);
1088 INIT_LIST_HEAD(&export->exp_stale_list);
1089 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1091 export->exp_sp_peer = LUSTRE_SP_ANY;
1092 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1093 export->exp_client_uuid = *cluuid;
1094 obd_init_export(export);
1096 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1097 spin_lock(&obd->obd_dev_lock);
1098 /* shouldn't happen, but might race */
1099 if (obd->obd_stopping)
1100 GOTO(exit_unlock, rc = -ENODEV);
1102 hash = cfs_hash_getref(obd->obd_uuid_hash);
1104 GOTO(exit_unlock, rc = -ENODEV);
1105 spin_unlock(&obd->obd_dev_lock);
1107 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1109 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1110 obd->obd_name, cluuid->uuid, rc);
1111 GOTO(exit_err, rc = -EALREADY);
1115 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1116 spin_lock(&obd->obd_dev_lock);
1117 if (obd->obd_stopping) {
1119 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1120 GOTO(exit_unlock, rc = -ESHUTDOWN);
1124 class_incref(obd, "export", export);
1125 list_add_tail(&export->exp_obd_chain_timed,
1126 &obd->obd_exports_timed);
1127 list_add(&export->exp_obd_chain, &obd->obd_exports);
1128 obd->obd_num_exports++;
1130 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1131 INIT_LIST_HEAD(&export->exp_obd_chain);
1133 spin_unlock(&obd->obd_dev_lock);
1135 cfs_hash_putref(hash);
1139 spin_unlock(&obd->obd_dev_lock);
1142 cfs_hash_putref(hash);
1143 class_handle_unhash(&export->exp_handle);
1144 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1145 obd_destroy_export(export);
1146 OBD_FREE_PTR(export);
1150 struct obd_export *class_new_export(struct obd_device *obd,
1151 struct obd_uuid *uuid)
1153 return __class_new_export(obd, uuid, false);
1155 EXPORT_SYMBOL(class_new_export);
1157 struct obd_export *class_new_export_self(struct obd_device *obd,
1158 struct obd_uuid *uuid)
1160 return __class_new_export(obd, uuid, true);
1163 void class_unlink_export(struct obd_export *exp)
1165 class_handle_unhash(&exp->exp_handle);
1167 if (exp->exp_obd->obd_self_export == exp) {
1168 class_export_put(exp);
1172 spin_lock(&exp->exp_obd->obd_dev_lock);
1173 /* delete an uuid-export hashitem from hashtables */
1174 if (!hlist_unhashed(&exp->exp_uuid_hash))
1175 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1176 &exp->exp_client_uuid,
1177 &exp->exp_uuid_hash);
1179 #ifdef HAVE_SERVER_SUPPORT
1180 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1181 struct tg_export_data *ted = &exp->exp_target_data;
1182 struct cfs_hash *hash;
1184 /* Because obd_gen_hash will not be released until
1185 * class_cleanup(), so hash should never be NULL here */
1186 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1187 LASSERT(hash != NULL);
1188 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1189 &exp->exp_gen_hash);
1190 cfs_hash_putref(hash);
1192 #endif /* HAVE_SERVER_SUPPORT */
1194 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1195 list_del_init(&exp->exp_obd_chain_timed);
1196 exp->exp_obd->obd_num_exports--;
1197 spin_unlock(&exp->exp_obd->obd_dev_lock);
1198 atomic_inc(&obd_stale_export_num);
1200 /* A reference is kept by obd_stale_exports list */
1201 obd_stale_export_put(exp);
1203 EXPORT_SYMBOL(class_unlink_export);
1205 /* Import management functions */
1206 static void obd_zombie_import_free(struct obd_import *imp)
1210 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1211 imp->imp_obd->obd_name);
1213 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1215 ptlrpc_put_connection_superhack(imp->imp_connection);
1217 while (!list_empty(&imp->imp_conn_list)) {
1218 struct obd_import_conn *imp_conn;
1220 imp_conn = list_entry(imp->imp_conn_list.next,
1221 struct obd_import_conn, oic_item);
1222 list_del_init(&imp_conn->oic_item);
1223 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1224 OBD_FREE(imp_conn, sizeof(*imp_conn));
1227 LASSERT(imp->imp_sec == NULL);
1228 class_decref(imp->imp_obd, "import", imp);
1233 struct obd_import *class_import_get(struct obd_import *import)
1235 atomic_inc(&import->imp_refcount);
1236 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1237 atomic_read(&import->imp_refcount),
1238 import->imp_obd->obd_name);
1241 EXPORT_SYMBOL(class_import_get);
1243 void class_import_put(struct obd_import *imp)
1247 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1249 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1250 atomic_read(&imp->imp_refcount) - 1,
1251 imp->imp_obd->obd_name);
1253 if (atomic_dec_and_test(&imp->imp_refcount)) {
1254 CDEBUG(D_INFO, "final put import %p\n", imp);
1255 obd_zombie_import_add(imp);
1258 /* catch possible import put race */
1259 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1262 EXPORT_SYMBOL(class_import_put);
1264 static void init_imp_at(struct imp_at *at) {
1266 at_init(&at->iat_net_latency, 0, 0);
1267 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1268 /* max service estimates are tracked on the server side, so
1269 don't use the AT history here, just use the last reported
1270 val. (But keep hist for proc histogram, worst_ever) */
1271 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1276 static void obd_zombie_imp_cull(struct work_struct *ws)
1278 struct obd_import *import;
1280 import = container_of(ws, struct obd_import, imp_zombie_work);
1281 obd_zombie_import_free(import);
1284 struct obd_import *class_new_import(struct obd_device *obd)
1286 struct obd_import *imp;
1287 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1289 OBD_ALLOC(imp, sizeof(*imp));
1293 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1294 INIT_LIST_HEAD(&imp->imp_replay_list);
1295 INIT_LIST_HEAD(&imp->imp_sending_list);
1296 INIT_LIST_HEAD(&imp->imp_delayed_list);
1297 INIT_LIST_HEAD(&imp->imp_committed_list);
1298 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1299 imp->imp_known_replied_xid = 0;
1300 imp->imp_replay_cursor = &imp->imp_committed_list;
1301 spin_lock_init(&imp->imp_lock);
1302 imp->imp_last_success_conn = 0;
1303 imp->imp_state = LUSTRE_IMP_NEW;
1304 imp->imp_obd = class_incref(obd, "import", imp);
1305 rwlock_init(&imp->imp_sec_lock);
1306 init_waitqueue_head(&imp->imp_recovery_waitq);
1307 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1309 if (curr_pid_ns->child_reaper)
1310 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1312 imp->imp_sec_refpid = 1;
1314 atomic_set(&imp->imp_refcount, 2);
1315 atomic_set(&imp->imp_unregistering, 0);
1316 atomic_set(&imp->imp_inflight, 0);
1317 atomic_set(&imp->imp_replay_inflight, 0);
1318 atomic_set(&imp->imp_inval_count, 0);
1319 INIT_LIST_HEAD(&imp->imp_conn_list);
1320 init_imp_at(&imp->imp_at);
1322 /* the default magic is V2, will be used in connect RPC, and
1323 * then adjusted according to the flags in request/reply. */
1324 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1328 EXPORT_SYMBOL(class_new_import);
1330 void class_destroy_import(struct obd_import *import)
1332 LASSERT(import != NULL);
1333 LASSERT(import != LP_POISON);
1335 spin_lock(&import->imp_lock);
1336 import->imp_generation++;
1337 spin_unlock(&import->imp_lock);
1338 class_import_put(import);
1340 EXPORT_SYMBOL(class_destroy_import);
1342 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1344 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1346 spin_lock(&exp->exp_locks_list_guard);
1348 LASSERT(lock->l_exp_refs_nr >= 0);
1350 if (lock->l_exp_refs_target != NULL &&
1351 lock->l_exp_refs_target != exp) {
1352 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1353 exp, lock, lock->l_exp_refs_target);
1355 if ((lock->l_exp_refs_nr ++) == 0) {
1356 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1357 lock->l_exp_refs_target = exp;
1359 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1360 lock, exp, lock->l_exp_refs_nr);
1361 spin_unlock(&exp->exp_locks_list_guard);
1363 EXPORT_SYMBOL(__class_export_add_lock_ref);
1365 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1367 spin_lock(&exp->exp_locks_list_guard);
1368 LASSERT(lock->l_exp_refs_nr > 0);
1369 if (lock->l_exp_refs_target != exp) {
1370 LCONSOLE_WARN("lock %p, "
1371 "mismatching export pointers: %p, %p\n",
1372 lock, lock->l_exp_refs_target, exp);
1374 if (-- lock->l_exp_refs_nr == 0) {
1375 list_del_init(&lock->l_exp_refs_link);
1376 lock->l_exp_refs_target = NULL;
1378 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1379 lock, exp, lock->l_exp_refs_nr);
1380 spin_unlock(&exp->exp_locks_list_guard);
1382 EXPORT_SYMBOL(__class_export_del_lock_ref);
1385 /* A connection defines an export context in which preallocation can
1386 be managed. This releases the export pointer reference, and returns
1387 the export handle, so the export refcount is 1 when this function
1389 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1390 struct obd_uuid *cluuid)
1392 struct obd_export *export;
1393 LASSERT(conn != NULL);
1394 LASSERT(obd != NULL);
1395 LASSERT(cluuid != NULL);
1398 export = class_new_export(obd, cluuid);
1400 RETURN(PTR_ERR(export));
1402 conn->cookie = export->exp_handle.h_cookie;
1403 class_export_put(export);
1405 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1406 cluuid->uuid, conn->cookie);
1409 EXPORT_SYMBOL(class_connect);
1411 /* if export is involved in recovery then clean up related things */
1412 static void class_export_recovery_cleanup(struct obd_export *exp)
1414 struct obd_device *obd = exp->exp_obd;
1416 spin_lock(&obd->obd_recovery_task_lock);
1417 if (obd->obd_recovering) {
1418 if (exp->exp_in_recovery) {
1419 spin_lock(&exp->exp_lock);
1420 exp->exp_in_recovery = 0;
1421 spin_unlock(&exp->exp_lock);
1422 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1423 atomic_dec(&obd->obd_connected_clients);
1426 /* if called during recovery then should update
1427 * obd_stale_clients counter,
1428 * lightweight exports are not counted */
1429 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1430 exp->exp_obd->obd_stale_clients++;
1432 spin_unlock(&obd->obd_recovery_task_lock);
1434 spin_lock(&exp->exp_lock);
1435 /** Cleanup req replay fields */
1436 if (exp->exp_req_replay_needed) {
1437 exp->exp_req_replay_needed = 0;
1439 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1440 atomic_dec(&obd->obd_req_replay_clients);
1443 /** Cleanup lock replay data */
1444 if (exp->exp_lock_replay_needed) {
1445 exp->exp_lock_replay_needed = 0;
1447 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1448 atomic_dec(&obd->obd_lock_replay_clients);
1450 spin_unlock(&exp->exp_lock);
1453 /* This function removes 1-3 references from the export:
1454 * 1 - for export pointer passed
1455 * and if disconnect really need
1456 * 2 - removing from hash
1457 * 3 - in client_unlink_export
1458 * The export pointer passed to this function can destroyed */
1459 int class_disconnect(struct obd_export *export)
1461 int already_disconnected;
1464 if (export == NULL) {
1465 CWARN("attempting to free NULL export %p\n", export);
1469 spin_lock(&export->exp_lock);
1470 already_disconnected = export->exp_disconnected;
1471 export->exp_disconnected = 1;
1472 /* We hold references of export for uuid hash
1473 * and nid_hash and export link at least. So
1474 * it is safe to call cfs_hash_del in there. */
1475 if (!hlist_unhashed(&export->exp_nid_hash))
1476 cfs_hash_del(export->exp_obd->obd_nid_hash,
1477 &export->exp_connection->c_peer.nid,
1478 &export->exp_nid_hash);
1479 spin_unlock(&export->exp_lock);
1481 /* class_cleanup(), abort_recovery(), and class_fail_export()
1482 * all end up in here, and if any of them race we shouldn't
1483 * call extra class_export_puts(). */
1484 if (already_disconnected) {
1485 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1486 GOTO(no_disconn, already_disconnected);
1489 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1490 export->exp_handle.h_cookie);
1492 class_export_recovery_cleanup(export);
1493 class_unlink_export(export);
1495 class_export_put(export);
1498 EXPORT_SYMBOL(class_disconnect);
1500 /* Return non-zero for a fully connected export */
1501 int class_connected_export(struct obd_export *exp)
1506 spin_lock(&exp->exp_lock);
1507 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1508 spin_unlock(&exp->exp_lock);
1512 EXPORT_SYMBOL(class_connected_export);
1514 static void class_disconnect_export_list(struct list_head *list,
1515 enum obd_option flags)
1518 struct obd_export *exp;
1521 /* It's possible that an export may disconnect itself, but
1522 * nothing else will be added to this list. */
1523 while (!list_empty(list)) {
1524 exp = list_entry(list->next, struct obd_export,
1526 /* need for safe call CDEBUG after obd_disconnect */
1527 class_export_get(exp);
1529 spin_lock(&exp->exp_lock);
1530 exp->exp_flags = flags;
1531 spin_unlock(&exp->exp_lock);
1533 if (obd_uuid_equals(&exp->exp_client_uuid,
1534 &exp->exp_obd->obd_uuid)) {
1536 "exp %p export uuid == obd uuid, don't discon\n",
1538 /* Need to delete this now so we don't end up pointing
1539 * to work_list later when this export is cleaned up. */
1540 list_del_init(&exp->exp_obd_chain);
1541 class_export_put(exp);
1545 class_export_get(exp);
1546 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1547 "last request at %lld\n",
1548 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1549 exp, exp->exp_last_request_time);
1550 /* release one export reference anyway */
1551 rc = obd_disconnect(exp);
1553 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1554 obd_export_nid2str(exp), exp, rc);
1555 class_export_put(exp);
1560 void class_disconnect_exports(struct obd_device *obd)
1562 struct list_head work_list;
1565 /* Move all of the exports from obd_exports to a work list, en masse. */
1566 INIT_LIST_HEAD(&work_list);
1567 spin_lock(&obd->obd_dev_lock);
1568 list_splice_init(&obd->obd_exports, &work_list);
1569 list_splice_init(&obd->obd_delayed_exports, &work_list);
1570 spin_unlock(&obd->obd_dev_lock);
1572 if (!list_empty(&work_list)) {
1573 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1574 "disconnecting them\n", obd->obd_minor, obd);
1575 class_disconnect_export_list(&work_list,
1576 exp_flags_from_obd(obd));
1578 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1579 obd->obd_minor, obd);
1582 EXPORT_SYMBOL(class_disconnect_exports);
1584 /* Remove exports that have not completed recovery.
1586 void class_disconnect_stale_exports(struct obd_device *obd,
1587 int (*test_export)(struct obd_export *))
1589 struct list_head work_list;
1590 struct obd_export *exp, *n;
1594 INIT_LIST_HEAD(&work_list);
1595 spin_lock(&obd->obd_dev_lock);
1596 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1598 /* don't count self-export as client */
1599 if (obd_uuid_equals(&exp->exp_client_uuid,
1600 &exp->exp_obd->obd_uuid))
1603 /* don't evict clients which have no slot in last_rcvd
1604 * (e.g. lightweight connection) */
1605 if (exp->exp_target_data.ted_lr_idx == -1)
1608 spin_lock(&exp->exp_lock);
1609 if (exp->exp_failed || test_export(exp)) {
1610 spin_unlock(&exp->exp_lock);
1613 exp->exp_failed = 1;
1614 spin_unlock(&exp->exp_lock);
1616 list_move(&exp->exp_obd_chain, &work_list);
1618 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1619 obd->obd_name, exp->exp_client_uuid.uuid,
1620 obd_export_nid2str(exp));
1621 print_export_data(exp, "EVICTING", 0, D_HA);
1623 spin_unlock(&obd->obd_dev_lock);
1626 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1627 obd->obd_name, evicted);
1629 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1630 OBD_OPT_ABORT_RECOV);
1633 EXPORT_SYMBOL(class_disconnect_stale_exports);
1635 void class_fail_export(struct obd_export *exp)
1637 int rc, already_failed;
1639 spin_lock(&exp->exp_lock);
1640 already_failed = exp->exp_failed;
1641 exp->exp_failed = 1;
1642 spin_unlock(&exp->exp_lock);
1644 if (already_failed) {
1645 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1646 exp, exp->exp_client_uuid.uuid);
1650 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1651 exp, exp->exp_client_uuid.uuid);
1653 if (obd_dump_on_timeout)
1654 libcfs_debug_dumplog();
1656 /* need for safe call CDEBUG after obd_disconnect */
1657 class_export_get(exp);
1659 /* Most callers into obd_disconnect are removing their own reference
1660 * (request, for example) in addition to the one from the hash table.
1661 * We don't have such a reference here, so make one. */
1662 class_export_get(exp);
1663 rc = obd_disconnect(exp);
1665 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1667 CDEBUG(D_HA, "disconnected export %p/%s\n",
1668 exp, exp->exp_client_uuid.uuid);
1669 class_export_put(exp);
1671 EXPORT_SYMBOL(class_fail_export);
1673 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1675 struct cfs_hash *nid_hash;
1676 struct obd_export *doomed_exp = NULL;
1677 int exports_evicted = 0;
1679 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1681 spin_lock(&obd->obd_dev_lock);
1682 /* umount has run already, so evict thread should leave
1683 * its task to umount thread now */
1684 if (obd->obd_stopping) {
1685 spin_unlock(&obd->obd_dev_lock);
1686 return exports_evicted;
1688 nid_hash = obd->obd_nid_hash;
1689 cfs_hash_getref(nid_hash);
1690 spin_unlock(&obd->obd_dev_lock);
1693 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1694 if (doomed_exp == NULL)
1697 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1698 "nid %s found, wanted nid %s, requested nid %s\n",
1699 obd_export_nid2str(doomed_exp),
1700 libcfs_nid2str(nid_key), nid);
1701 LASSERTF(doomed_exp != obd->obd_self_export,
1702 "self-export is hashed by NID?\n");
1704 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1705 "request\n", obd->obd_name,
1706 obd_uuid2str(&doomed_exp->exp_client_uuid),
1707 obd_export_nid2str(doomed_exp));
1708 class_fail_export(doomed_exp);
1709 class_export_put(doomed_exp);
1712 cfs_hash_putref(nid_hash);
1714 if (!exports_evicted)
1715 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1716 obd->obd_name, nid);
1717 return exports_evicted;
1719 EXPORT_SYMBOL(obd_export_evict_by_nid);
1721 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1723 struct cfs_hash *uuid_hash;
1724 struct obd_export *doomed_exp = NULL;
1725 struct obd_uuid doomed_uuid;
1726 int exports_evicted = 0;
1728 spin_lock(&obd->obd_dev_lock);
1729 if (obd->obd_stopping) {
1730 spin_unlock(&obd->obd_dev_lock);
1731 return exports_evicted;
1733 uuid_hash = obd->obd_uuid_hash;
1734 cfs_hash_getref(uuid_hash);
1735 spin_unlock(&obd->obd_dev_lock);
1737 obd_str2uuid(&doomed_uuid, uuid);
1738 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1739 CERROR("%s: can't evict myself\n", obd->obd_name);
1740 cfs_hash_putref(uuid_hash);
1741 return exports_evicted;
1744 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1746 if (doomed_exp == NULL) {
1747 CERROR("%s: can't disconnect %s: no exports found\n",
1748 obd->obd_name, uuid);
1750 CWARN("%s: evicting %s at adminstrative request\n",
1751 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1752 class_fail_export(doomed_exp);
1753 class_export_put(doomed_exp);
1756 cfs_hash_putref(uuid_hash);
1758 return exports_evicted;
1761 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1762 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1763 EXPORT_SYMBOL(class_export_dump_hook);
1766 static void print_export_data(struct obd_export *exp, const char *status,
1767 int locks, int debug_level)
1769 struct ptlrpc_reply_state *rs;
1770 struct ptlrpc_reply_state *first_reply = NULL;
1773 spin_lock(&exp->exp_lock);
1774 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1780 spin_unlock(&exp->exp_lock);
1782 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1783 "%p %s %llu stale:%d\n",
1784 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1785 obd_export_nid2str(exp),
1786 refcount_read(&exp->exp_handle.h_ref),
1787 atomic_read(&exp->exp_rpc_count),
1788 atomic_read(&exp->exp_cb_count),
1789 atomic_read(&exp->exp_locks_count),
1790 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1791 nreplies, first_reply, nreplies > 3 ? "..." : "",
1792 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1793 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1794 if (locks && class_export_dump_hook != NULL)
1795 class_export_dump_hook(exp);
1799 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1801 struct obd_export *exp;
1803 spin_lock(&obd->obd_dev_lock);
1804 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1805 print_export_data(exp, "ACTIVE", locks, debug_level);
1806 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1807 print_export_data(exp, "UNLINKED", locks, debug_level);
1808 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1809 print_export_data(exp, "DELAYED", locks, debug_level);
1810 spin_unlock(&obd->obd_dev_lock);
1813 void obd_exports_barrier(struct obd_device *obd)
1816 LASSERT(list_empty(&obd->obd_exports));
1817 spin_lock(&obd->obd_dev_lock);
1818 while (!list_empty(&obd->obd_unlinked_exports)) {
1819 spin_unlock(&obd->obd_dev_lock);
1820 set_current_state(TASK_UNINTERRUPTIBLE);
1821 schedule_timeout(cfs_time_seconds(waited));
1822 if (waited > 5 && is_power_of_2(waited)) {
1823 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1824 "more than %d seconds. "
1825 "The obd refcount = %d. Is it stuck?\n",
1826 obd->obd_name, waited,
1827 atomic_read(&obd->obd_refcount));
1828 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1831 spin_lock(&obd->obd_dev_lock);
1833 spin_unlock(&obd->obd_dev_lock);
1835 EXPORT_SYMBOL(obd_exports_barrier);
1838 * Add export to the obd_zombe thread and notify it.
1840 static void obd_zombie_export_add(struct obd_export *exp) {
1841 atomic_dec(&obd_stale_export_num);
1842 spin_lock(&exp->exp_obd->obd_dev_lock);
1843 LASSERT(!list_empty(&exp->exp_obd_chain));
1844 list_del_init(&exp->exp_obd_chain);
1845 spin_unlock(&exp->exp_obd->obd_dev_lock);
1847 queue_work(zombie_wq, &exp->exp_zombie_work);
1851 * Add import to the obd_zombe thread and notify it.
1853 static void obd_zombie_import_add(struct obd_import *imp) {
1854 LASSERT(imp->imp_sec == NULL);
1856 queue_work(zombie_wq, &imp->imp_zombie_work);
1860 * wait when obd_zombie import/export queues become empty
1862 void obd_zombie_barrier(void)
1864 flush_workqueue(zombie_wq);
1866 EXPORT_SYMBOL(obd_zombie_barrier);
1869 struct obd_export *obd_stale_export_get(void)
1871 struct obd_export *exp = NULL;
1874 spin_lock(&obd_stale_export_lock);
1875 if (!list_empty(&obd_stale_exports)) {
1876 exp = list_entry(obd_stale_exports.next,
1877 struct obd_export, exp_stale_list);
1878 list_del_init(&exp->exp_stale_list);
1880 spin_unlock(&obd_stale_export_lock);
1883 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1884 atomic_read(&obd_stale_export_num));
1888 EXPORT_SYMBOL(obd_stale_export_get);
1890 void obd_stale_export_put(struct obd_export *exp)
1894 LASSERT(list_empty(&exp->exp_stale_list));
1895 if (exp->exp_lock_hash &&
1896 atomic_read(&exp->exp_lock_hash->hs_count)) {
1897 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1898 atomic_read(&obd_stale_export_num));
1900 spin_lock_bh(&exp->exp_bl_list_lock);
1901 spin_lock(&obd_stale_export_lock);
1902 /* Add to the tail if there is no blocked locks,
1903 * to the head otherwise. */
1904 if (list_empty(&exp->exp_bl_list))
1905 list_add_tail(&exp->exp_stale_list,
1906 &obd_stale_exports);
1908 list_add(&exp->exp_stale_list,
1909 &obd_stale_exports);
1911 spin_unlock(&obd_stale_export_lock);
1912 spin_unlock_bh(&exp->exp_bl_list_lock);
1914 class_export_put(exp);
1918 EXPORT_SYMBOL(obd_stale_export_put);
1921 * Adjust the position of the export in the stale list,
1922 * i.e. move to the head of the list if is needed.
1924 void obd_stale_export_adjust(struct obd_export *exp)
1926 LASSERT(exp != NULL);
1927 spin_lock_bh(&exp->exp_bl_list_lock);
1928 spin_lock(&obd_stale_export_lock);
1930 if (!list_empty(&exp->exp_stale_list) &&
1931 !list_empty(&exp->exp_bl_list))
1932 list_move(&exp->exp_stale_list, &obd_stale_exports);
1934 spin_unlock(&obd_stale_export_lock);
1935 spin_unlock_bh(&exp->exp_bl_list_lock);
1937 EXPORT_SYMBOL(obd_stale_export_adjust);
1940 * start destroy zombie import/export thread
1942 int obd_zombie_impexp_init(void)
1944 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1952 * stop destroy zombie import/export thread
1954 void obd_zombie_impexp_stop(void)
1956 destroy_workqueue(zombie_wq);
1957 LASSERT(list_empty(&obd_stale_exports));
1960 /***** Kernel-userspace comm helpers *******/
1962 /* Get length of entire message, including header */
1963 int kuc_len(int payload_len)
1965 return sizeof(struct kuc_hdr) + payload_len;
1967 EXPORT_SYMBOL(kuc_len);
1969 /* Get a pointer to kuc header, given a ptr to the payload
1970 * @param p Pointer to payload area
1971 * @returns Pointer to kuc header
1973 struct kuc_hdr * kuc_ptr(void *p)
1975 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1976 LASSERT(lh->kuc_magic == KUC_MAGIC);
1979 EXPORT_SYMBOL(kuc_ptr);
1981 /* Alloc space for a message, and fill in header
1982 * @return Pointer to payload area
1984 void *kuc_alloc(int payload_len, int transport, int type)
1987 int len = kuc_len(payload_len);
1991 return ERR_PTR(-ENOMEM);
1993 lh->kuc_magic = KUC_MAGIC;
1994 lh->kuc_transport = transport;
1995 lh->kuc_msgtype = type;
1996 lh->kuc_msglen = len;
1998 return (void *)(lh + 1);
2000 EXPORT_SYMBOL(kuc_alloc);
2002 /* Takes pointer to payload area */
2003 void kuc_free(void *p, int payload_len)
2005 struct kuc_hdr *lh = kuc_ptr(p);
2006 OBD_FREE(lh, kuc_len(payload_len));
2008 EXPORT_SYMBOL(kuc_free);
2010 struct obd_request_slot_waiter {
2011 struct list_head orsw_entry;
2012 wait_queue_head_t orsw_waitq;
2016 static bool obd_request_slot_avail(struct client_obd *cli,
2017 struct obd_request_slot_waiter *orsw)
2021 spin_lock(&cli->cl_loi_list_lock);
2022 avail = !!list_empty(&orsw->orsw_entry);
2023 spin_unlock(&cli->cl_loi_list_lock);
2029 * For network flow control, the RPC sponsor needs to acquire a credit
2030 * before sending the RPC. The credits count for a connection is defined
2031 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2032 * the subsequent RPC sponsors need to wait until others released their
2033 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2035 int obd_get_request_slot(struct client_obd *cli)
2037 struct obd_request_slot_waiter orsw;
2038 struct l_wait_info lwi;
2041 spin_lock(&cli->cl_loi_list_lock);
2042 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2043 cli->cl_rpcs_in_flight++;
2044 spin_unlock(&cli->cl_loi_list_lock);
2048 init_waitqueue_head(&orsw.orsw_waitq);
2049 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2050 orsw.orsw_signaled = false;
2051 spin_unlock(&cli->cl_loi_list_lock);
2053 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2054 rc = l_wait_event(orsw.orsw_waitq,
2055 obd_request_slot_avail(cli, &orsw) ||
2059 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2060 * freed but other (such as obd_put_request_slot) is using it. */
2061 spin_lock(&cli->cl_loi_list_lock);
2063 if (!orsw.orsw_signaled) {
2064 if (list_empty(&orsw.orsw_entry))
2065 cli->cl_rpcs_in_flight--;
2067 list_del(&orsw.orsw_entry);
2071 if (orsw.orsw_signaled) {
2072 LASSERT(list_empty(&orsw.orsw_entry));
2076 spin_unlock(&cli->cl_loi_list_lock);
2080 EXPORT_SYMBOL(obd_get_request_slot);
2082 void obd_put_request_slot(struct client_obd *cli)
2084 struct obd_request_slot_waiter *orsw;
2086 spin_lock(&cli->cl_loi_list_lock);
2087 cli->cl_rpcs_in_flight--;
2089 /* If there is free slot, wakeup the first waiter. */
2090 if (!list_empty(&cli->cl_flight_waiters) &&
2091 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2092 orsw = list_entry(cli->cl_flight_waiters.next,
2093 struct obd_request_slot_waiter, orsw_entry);
2094 list_del_init(&orsw->orsw_entry);
2095 cli->cl_rpcs_in_flight++;
2096 wake_up(&orsw->orsw_waitq);
2098 spin_unlock(&cli->cl_loi_list_lock);
2100 EXPORT_SYMBOL(obd_put_request_slot);
2102 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2104 return cli->cl_max_rpcs_in_flight;
2106 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2108 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2110 struct obd_request_slot_waiter *orsw;
2114 const char *type_name;
2117 if (max > OBD_MAX_RIF_MAX || max < 1)
2120 type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2121 if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2122 /* adjust max_mod_rpcs_in_flight to ensure it is always
2123 * strictly lower that max_rpcs_in_flight */
2125 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2126 "because it must be higher than "
2127 "max_mod_rpcs_in_flight value",
2128 cli->cl_import->imp_obd->obd_name);
2131 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2132 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2138 spin_lock(&cli->cl_loi_list_lock);
2139 old = cli->cl_max_rpcs_in_flight;
2140 cli->cl_max_rpcs_in_flight = max;
2141 client_adjust_max_dirty(cli);
2145 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2146 for (i = 0; i < diff; i++) {
2147 if (list_empty(&cli->cl_flight_waiters))
2150 orsw = list_entry(cli->cl_flight_waiters.next,
2151 struct obd_request_slot_waiter, orsw_entry);
2152 list_del_init(&orsw->orsw_entry);
2153 cli->cl_rpcs_in_flight++;
2154 wake_up(&orsw->orsw_waitq);
2156 spin_unlock(&cli->cl_loi_list_lock);
2160 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2162 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2164 return cli->cl_max_mod_rpcs_in_flight;
2166 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2168 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2170 struct obd_connect_data *ocd;
2174 if (max > OBD_MAX_RIF_MAX || max < 1)
2177 /* cannot exceed or equal max_rpcs_in_flight */
2178 if (max >= cli->cl_max_rpcs_in_flight) {
2179 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2180 "higher or equal to max_rpcs_in_flight value (%u)\n",
2181 cli->cl_import->imp_obd->obd_name,
2182 max, cli->cl_max_rpcs_in_flight);
2186 /* cannot exceed max modify RPCs in flight supported by the server */
2187 ocd = &cli->cl_import->imp_connect_data;
2188 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2189 maxmodrpcs = ocd->ocd_maxmodrpcs;
2192 if (max > maxmodrpcs) {
2193 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2194 "higher than max_mod_rpcs_per_client value (%hu) "
2195 "returned by the server at connection\n",
2196 cli->cl_import->imp_obd->obd_name,
2201 spin_lock(&cli->cl_mod_rpcs_lock);
2203 prev = cli->cl_max_mod_rpcs_in_flight;
2204 cli->cl_max_mod_rpcs_in_flight = max;
2206 /* wakeup waiters if limit has been increased */
2207 if (cli->cl_max_mod_rpcs_in_flight > prev)
2208 wake_up(&cli->cl_mod_rpcs_waitq);
2210 spin_unlock(&cli->cl_mod_rpcs_lock);
2214 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2216 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2217 struct seq_file *seq)
2219 unsigned long mod_tot = 0, mod_cum;
2220 struct timespec64 now;
2223 ktime_get_real_ts64(&now);
2225 spin_lock(&cli->cl_mod_rpcs_lock);
2227 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2228 (s64)now.tv_sec, now.tv_nsec);
2229 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2230 cli->cl_mod_rpcs_in_flight);
2232 seq_printf(seq, "\n\t\t\tmodify\n");
2233 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2235 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2238 for (i = 0; i < OBD_HIST_MAX; i++) {
2239 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2241 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2242 i, mod, pct(mod, mod_tot),
2243 pct(mod_cum, mod_tot));
2244 if (mod_cum == mod_tot)
2248 spin_unlock(&cli->cl_mod_rpcs_lock);
2252 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2254 /* The number of modify RPCs sent in parallel is limited
2255 * because the server has a finite number of slots per client to
2256 * store request result and ensure reply reconstruction when needed.
2257 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2258 * that takes into account server limit and cl_max_rpcs_in_flight
2260 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2261 * one close request is allowed above the maximum.
2263 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2268 /* A slot is available if
2269 * - number of modify RPCs in flight is less than the max
2270 * - it's a close RPC and no other close request is in flight
2272 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2273 (close_req && cli->cl_close_rpcs_in_flight == 0);
2278 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2283 spin_lock(&cli->cl_mod_rpcs_lock);
2284 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2285 spin_unlock(&cli->cl_mod_rpcs_lock);
2289 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2292 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2293 it->it_op == IT_READDIR ||
2294 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2299 /* Get a modify RPC slot from the obd client @cli according
2300 * to the kind of operation @opc that is going to be sent
2301 * and the intent @it of the operation if it applies.
2302 * If the maximum number of modify RPCs in flight is reached
2303 * the thread is put to sleep.
2304 * Returns the tag to be set in the request message. Tag 0
2305 * is reserved for non-modifying requests.
2307 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2308 struct lookup_intent *it)
2310 bool close_req = false;
2313 /* read-only metadata RPCs don't consume a slot on MDT
2314 * for reply reconstruction
2316 if (obd_skip_mod_rpc_slot(it))
2319 if (opc == MDS_CLOSE)
2323 spin_lock(&cli->cl_mod_rpcs_lock);
2324 max = cli->cl_max_mod_rpcs_in_flight;
2325 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2326 /* there is a slot available */
2327 cli->cl_mod_rpcs_in_flight++;
2329 cli->cl_close_rpcs_in_flight++;
2330 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2331 cli->cl_mod_rpcs_in_flight);
2332 /* find a free tag */
2333 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2335 LASSERT(i < OBD_MAX_RIF_MAX);
2336 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2337 spin_unlock(&cli->cl_mod_rpcs_lock);
2338 /* tag 0 is reserved for non-modify RPCs */
2340 CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
2341 "opc %u, max %hu\n",
2342 cli->cl_import->imp_obd->obd_name,
2347 spin_unlock(&cli->cl_mod_rpcs_lock);
2349 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2350 "opc %u, max %hu\n",
2351 cli->cl_import->imp_obd->obd_name, opc, max);
2353 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2354 obd_mod_rpc_slot_avail(cli,
2358 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2360 /* Put a modify RPC slot from the obd client @cli according
2361 * to the kind of operation @opc that has been sent and the
2362 * intent @it of the operation if it applies.
2364 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2365 struct lookup_intent *it, __u16 tag)
2367 bool close_req = false;
2369 if (obd_skip_mod_rpc_slot(it))
2372 if (opc == MDS_CLOSE)
2375 spin_lock(&cli->cl_mod_rpcs_lock);
2376 cli->cl_mod_rpcs_in_flight--;
2378 cli->cl_close_rpcs_in_flight--;
2379 /* release the tag in the bitmap */
2380 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2381 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2382 spin_unlock(&cli->cl_mod_rpcs_lock);
2383 wake_up(&cli->cl_mod_rpcs_waitq);
2385 EXPORT_SYMBOL(obd_put_mod_rpc_slot);