4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59 const char *status, int locks, int debug_level);
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90 obd, obd->obd_namespace, obd->obd_force);
93 lu_ref_fini(&obd->obd_reference);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 struct obd_type *class_search_type(const char *name)
99 struct kobject *kobj = kset_find_obj(lustre_kset, name);
101 if (kobj && kobj->ktype == &class_ktype)
102 return container_of(kobj, struct obd_type, typ_kobj);
107 EXPORT_SYMBOL(class_search_type);
109 struct obd_type *class_get_type(const char *name)
111 struct obd_type *type;
113 type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
116 const char *modname = name;
118 #ifdef HAVE_SERVER_SUPPORT
119 if (strcmp(modname, "obdfilter") == 0)
122 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123 modname = LUSTRE_OSP_NAME;
125 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126 modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 spin_lock(&type->obd_type_lock);
141 try_module_get(type->typ_dt_ops->o_owner);
142 spin_unlock(&type->obd_type_lock);
143 /* class_search_type() returned a counted reference,
144 * but we don't need that count any more as
145 * we have one through typ_refcnt.
147 kobject_put(&type->typ_kobj);
152 void class_put_type(struct obd_type *type)
155 spin_lock(&type->obd_type_lock);
157 module_put(type->typ_dt_ops->o_owner);
158 spin_unlock(&type->obd_type_lock);
161 static void class_sysfs_release(struct kobject *kobj)
163 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
165 debugfs_remove_recursive(type->typ_debugfs_entry);
166 type->typ_debugfs_entry = NULL;
169 lu_device_type_fini(type->typ_lu);
171 #ifdef CONFIG_PROC_FS
172 if (type->typ_name && type->typ_procroot)
173 remove_proc_subtree(type->typ_name, proc_lustre_root);
175 if (type->typ_md_ops)
176 OBD_FREE_PTR(type->typ_md_ops);
177 if (type->typ_dt_ops)
178 OBD_FREE_PTR(type->typ_dt_ops);
180 OBD_FREE(type, sizeof(*type));
183 static struct kobj_type class_ktype = {
184 .sysfs_ops = &lustre_sysfs_ops,
185 .release = class_sysfs_release,
188 #ifdef HAVE_SERVER_SUPPORT
189 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
191 struct dentry *symlink;
192 struct obd_type *type;
195 type = class_search_type(name);
197 kobject_put(&type->typ_kobj);
198 return ERR_PTR(-EEXIST);
201 OBD_ALLOC(type, sizeof(*type));
203 return ERR_PTR(-ENOMEM);
205 type->typ_kobj.kset = lustre_kset;
206 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
207 &lustre_kset->kobj, "%s", name);
211 symlink = debugfs_create_dir(name, debugfs_lustre_root);
212 if (IS_ERR_OR_NULL(symlink)) {
213 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
214 kobject_put(&type->typ_kobj);
217 type->typ_debugfs_entry = symlink;
218 type->typ_sym_filter = true;
221 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
223 if (IS_ERR(type->typ_procroot)) {
224 CERROR("%s: can't create compat proc entry: %d\n",
225 name, (int)PTR_ERR(type->typ_procroot));
226 type->typ_procroot = NULL;
232 EXPORT_SYMBOL(class_add_symlinks);
233 #endif /* HAVE_SERVER_SUPPORT */
235 #define CLASS_MAX_NAME 1024
237 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
238 bool enable_proc, struct lprocfs_vars *vars,
239 const char *name, struct lu_device_type *ldt)
241 struct obd_type *type;
246 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
248 type = class_search_type(name);
250 #ifdef HAVE_SERVER_SUPPORT
251 if (type->typ_sym_filter)
253 #endif /* HAVE_SERVER_SUPPORT */
254 kobject_put(&type->typ_kobj);
255 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
259 OBD_ALLOC(type, sizeof(*type));
263 type->typ_kobj.kset = lustre_kset;
264 kobject_init(&type->typ_kobj, &class_ktype);
265 #ifdef HAVE_SERVER_SUPPORT
267 #endif /* HAVE_SERVER_SUPPORT */
268 OBD_ALLOC_PTR(type->typ_dt_ops);
269 OBD_ALLOC_PTR(type->typ_md_ops);
271 if (type->typ_dt_ops == NULL ||
272 type->typ_md_ops == NULL)
273 GOTO (failed, rc = -ENOMEM);
275 *(type->typ_dt_ops) = *dt_ops;
276 /* md_ops is optional */
278 *(type->typ_md_ops) = *md_ops;
279 spin_lock_init(&type->obd_type_lock);
281 #ifdef HAVE_SERVER_SUPPORT
282 if (type->typ_sym_filter) {
283 type->typ_sym_filter = false;
284 kobject_put(&type->typ_kobj);
288 #ifdef CONFIG_PROC_FS
289 if (enable_proc && !type->typ_procroot) {
290 type->typ_procroot = lprocfs_register(name,
293 if (IS_ERR(type->typ_procroot)) {
294 rc = PTR_ERR(type->typ_procroot);
295 type->typ_procroot = NULL;
300 type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
302 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
303 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
305 type->typ_debugfs_entry = NULL;
309 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
312 #ifdef HAVE_SERVER_SUPPORT
317 rc = lu_device_type_init(ldt);
325 kobject_put(&type->typ_kobj);
329 EXPORT_SYMBOL(class_register_type);
331 int class_unregister_type(const char *name)
333 struct obd_type *type = class_search_type(name);
338 CERROR("unknown obd type\n");
342 if (type->typ_refcnt) {
343 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
344 /* This is a bad situation, let's make the best of it */
345 /* Remove ops, but leave the name for debugging */
346 OBD_FREE_PTR(type->typ_dt_ops);
347 OBD_FREE_PTR(type->typ_md_ops);
348 GOTO(out_put, rc = -EBUSY);
351 /* Put the final ref */
352 kobject_put(&type->typ_kobj);
354 /* Put the ref returned by class_search_type() */
355 kobject_put(&type->typ_kobj);
358 } /* class_unregister_type */
359 EXPORT_SYMBOL(class_unregister_type);
362 * Create a new obd device.
364 * Allocate the new obd_device and initialize it.
366 * \param[in] type_name obd device type string.
367 * \param[in] name obd device name.
368 * \param[in] uuid obd device UUID
370 * \retval newdev pointer to created obd_device
371 * \retval ERR_PTR(errno) on error
373 struct obd_device *class_newdev(const char *type_name, const char *name,
376 struct obd_device *newdev;
377 struct obd_type *type = NULL;
380 if (strlen(name) >= MAX_OBD_NAME) {
381 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
382 RETURN(ERR_PTR(-EINVAL));
385 type = class_get_type(type_name);
387 CERROR("OBD: unknown type: %s\n", type_name);
388 RETURN(ERR_PTR(-ENODEV));
391 newdev = obd_device_alloc();
392 if (newdev == NULL) {
393 class_put_type(type);
394 RETURN(ERR_PTR(-ENOMEM));
396 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
397 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
398 newdev->obd_type = type;
399 newdev->obd_minor = -1;
401 rwlock_init(&newdev->obd_pool_lock);
402 newdev->obd_pool_limit = 0;
403 newdev->obd_pool_slv = 0;
405 INIT_LIST_HEAD(&newdev->obd_exports);
406 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
407 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
408 INIT_LIST_HEAD(&newdev->obd_exports_timed);
409 INIT_LIST_HEAD(&newdev->obd_nid_stats);
410 spin_lock_init(&newdev->obd_nid_lock);
411 spin_lock_init(&newdev->obd_dev_lock);
412 mutex_init(&newdev->obd_dev_mutex);
413 spin_lock_init(&newdev->obd_osfs_lock);
414 /* newdev->obd_osfs_age must be set to a value in the distant
415 * past to guarantee a fresh statfs is fetched on mount. */
416 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
418 /* XXX belongs in setup not attach */
419 init_rwsem(&newdev->obd_observer_link_sem);
421 spin_lock_init(&newdev->obd_recovery_task_lock);
422 init_waitqueue_head(&newdev->obd_next_transno_waitq);
423 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
424 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
425 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
426 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
427 INIT_LIST_HEAD(&newdev->obd_evict_list);
428 INIT_LIST_HEAD(&newdev->obd_lwp_list);
430 llog_group_init(&newdev->obd_olg);
431 /* Detach drops this */
432 atomic_set(&newdev->obd_refcount, 1);
433 lu_ref_init(&newdev->obd_reference);
434 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
436 newdev->obd_conn_inprogress = 0;
438 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
440 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
441 newdev->obd_name, newdev);
449 * \param[in] obd obd_device to be freed
453 void class_free_dev(struct obd_device *obd)
455 struct obd_type *obd_type = obd->obd_type;
457 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
458 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
459 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
460 "obd %p != obd_devs[%d] %p\n",
461 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
462 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
463 "obd_refcount should be 0, not %d\n",
464 atomic_read(&obd->obd_refcount));
465 LASSERT(obd_type != NULL);
467 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
468 obd->obd_name, obd->obd_type->typ_name);
470 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
471 obd->obd_name, obd->obd_uuid.uuid);
472 if (obd->obd_stopping) {
475 /* If we're not stopping, we were never set up */
476 err = obd_cleanup(obd);
478 CERROR("Cleanup %s returned %d\n",
482 obd_device_free(obd);
484 class_put_type(obd_type);
488 * Unregister obd device.
490 * Free slot in obd_dev[] used by \a obd.
492 * \param[in] new_obd obd_device to be unregistered
496 void class_unregister_device(struct obd_device *obd)
498 write_lock(&obd_dev_lock);
499 if (obd->obd_minor >= 0) {
500 LASSERT(obd_devs[obd->obd_minor] == obd);
501 obd_devs[obd->obd_minor] = NULL;
504 write_unlock(&obd_dev_lock);
508 * Register obd device.
510 * Find free slot in obd_devs[], fills it with \a new_obd.
512 * \param[in] new_obd obd_device to be registered
515 * \retval -EEXIST device with this name is registered
516 * \retval -EOVERFLOW obd_devs[] is full
518 int class_register_device(struct obd_device *new_obd)
522 int new_obd_minor = 0;
523 bool minor_assign = false;
524 bool retried = false;
527 write_lock(&obd_dev_lock);
528 for (i = 0; i < class_devno_max(); i++) {
529 struct obd_device *obd = class_num2obd(i);
532 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
535 write_unlock(&obd_dev_lock);
537 /* the obd_device could be waited to be
538 * destroyed by the "obd_zombie_impexp_thread".
540 obd_zombie_barrier();
545 CERROR("%s: already exists, won't add\n",
547 /* in case we found a free slot before duplicate */
548 minor_assign = false;
552 if (!minor_assign && obd == NULL) {
559 new_obd->obd_minor = new_obd_minor;
560 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
561 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
562 obd_devs[new_obd_minor] = new_obd;
566 CERROR("%s: all %u/%u devices used, increase "
567 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
568 i, class_devno_max(), ret);
571 write_unlock(&obd_dev_lock);
576 static int class_name2dev_nolock(const char *name)
583 for (i = 0; i < class_devno_max(); i++) {
584 struct obd_device *obd = class_num2obd(i);
586 if (obd && strcmp(name, obd->obd_name) == 0) {
587 /* Make sure we finished attaching before we give
588 out any references */
589 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
590 if (obd->obd_attached) {
600 int class_name2dev(const char *name)
607 read_lock(&obd_dev_lock);
608 i = class_name2dev_nolock(name);
609 read_unlock(&obd_dev_lock);
613 EXPORT_SYMBOL(class_name2dev);
615 struct obd_device *class_name2obd(const char *name)
617 int dev = class_name2dev(name);
619 if (dev < 0 || dev > class_devno_max())
621 return class_num2obd(dev);
623 EXPORT_SYMBOL(class_name2obd);
625 int class_uuid2dev_nolock(struct obd_uuid *uuid)
629 for (i = 0; i < class_devno_max(); i++) {
630 struct obd_device *obd = class_num2obd(i);
632 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
633 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
641 int class_uuid2dev(struct obd_uuid *uuid)
645 read_lock(&obd_dev_lock);
646 i = class_uuid2dev_nolock(uuid);
647 read_unlock(&obd_dev_lock);
651 EXPORT_SYMBOL(class_uuid2dev);
653 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
655 int dev = class_uuid2dev(uuid);
658 return class_num2obd(dev);
660 EXPORT_SYMBOL(class_uuid2obd);
663 * Get obd device from ::obd_devs[]
665 * \param num [in] array index
667 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
668 * otherwise return the obd device there.
670 struct obd_device *class_num2obd(int num)
672 struct obd_device *obd = NULL;
674 if (num < class_devno_max()) {
679 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
680 "%p obd_magic %08x != %08x\n",
681 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
682 LASSERTF(obd->obd_minor == num,
683 "%p obd_minor %0d != %0d\n",
684 obd, obd->obd_minor, num);
691 * Find obd in obd_dev[] by name or uuid.
693 * Increment obd's refcount if found.
695 * \param[in] str obd name or uuid
697 * \retval NULL if not found
698 * \retval target pointer to found obd_device
700 struct obd_device *class_dev_by_str(const char *str)
702 struct obd_device *target = NULL;
703 struct obd_uuid tgtuuid;
706 obd_str2uuid(&tgtuuid, str);
708 read_lock(&obd_dev_lock);
709 rc = class_uuid2dev_nolock(&tgtuuid);
711 rc = class_name2dev_nolock(str);
714 target = class_num2obd(rc);
717 class_incref(target, "find", current);
718 read_unlock(&obd_dev_lock);
722 EXPORT_SYMBOL(class_dev_by_str);
725 * Get obd devices count. Device in any
727 * \retval obd device count
729 int get_devices_count(void)
731 int index, max_index = class_devno_max(), dev_count = 0;
733 read_lock(&obd_dev_lock);
734 for (index = 0; index <= max_index; index++) {
735 struct obd_device *obd = class_num2obd(index);
739 read_unlock(&obd_dev_lock);
743 EXPORT_SYMBOL(get_devices_count);
745 void class_obd_list(void)
750 read_lock(&obd_dev_lock);
751 for (i = 0; i < class_devno_max(); i++) {
752 struct obd_device *obd = class_num2obd(i);
756 if (obd->obd_stopping)
758 else if (obd->obd_set_up)
760 else if (obd->obd_attached)
764 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
765 i, status, obd->obd_type->typ_name,
766 obd->obd_name, obd->obd_uuid.uuid,
767 atomic_read(&obd->obd_refcount));
769 read_unlock(&obd_dev_lock);
773 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
774 specified, then only the client with that uuid is returned,
775 otherwise any client connected to the tgt is returned. */
776 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
777 const char *type_name,
778 struct obd_uuid *grp_uuid)
782 read_lock(&obd_dev_lock);
783 for (i = 0; i < class_devno_max(); i++) {
784 struct obd_device *obd = class_num2obd(i);
788 if ((strncmp(obd->obd_type->typ_name, type_name,
789 strlen(type_name)) == 0)) {
790 if (obd_uuid_equals(tgt_uuid,
791 &obd->u.cli.cl_target_uuid) &&
792 ((grp_uuid)? obd_uuid_equals(grp_uuid,
793 &obd->obd_uuid) : 1)) {
794 read_unlock(&obd_dev_lock);
799 read_unlock(&obd_dev_lock);
803 EXPORT_SYMBOL(class_find_client_obd);
805 /* Iterate the obd_device list looking devices have grp_uuid. Start
806 searching at *next, and if a device is found, the next index to look
807 at is saved in *next. If next is NULL, then the first matching device
808 will always be returned. */
809 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
815 else if (*next >= 0 && *next < class_devno_max())
820 read_lock(&obd_dev_lock);
821 for (; i < class_devno_max(); i++) {
822 struct obd_device *obd = class_num2obd(i);
826 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
829 read_unlock(&obd_dev_lock);
833 read_unlock(&obd_dev_lock);
837 EXPORT_SYMBOL(class_devices_in_group);
840 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
841 * adjust sptlrpc settings accordingly.
843 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
845 struct obd_device *obd;
849 LASSERT(namelen > 0);
851 read_lock(&obd_dev_lock);
852 for (i = 0; i < class_devno_max(); i++) {
853 obd = class_num2obd(i);
855 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
858 /* only notify mdc, osc, osp, lwp, mdt, ost
859 * because only these have a -sptlrpc llog */
860 type = obd->obd_type->typ_name;
861 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
862 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
863 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
864 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
865 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
866 strcmp(type, LUSTRE_OST_NAME) != 0)
869 if (strncmp(obd->obd_name, fsname, namelen))
872 class_incref(obd, __FUNCTION__, obd);
873 read_unlock(&obd_dev_lock);
874 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
875 sizeof(KEY_SPTLRPC_CONF),
876 KEY_SPTLRPC_CONF, 0, NULL, NULL);
878 class_decref(obd, __FUNCTION__, obd);
879 read_lock(&obd_dev_lock);
881 read_unlock(&obd_dev_lock);
884 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
886 void obd_cleanup_caches(void)
889 if (obd_device_cachep) {
890 kmem_cache_destroy(obd_device_cachep);
891 obd_device_cachep = NULL;
897 int obd_init_caches(void)
902 LASSERT(obd_device_cachep == NULL);
903 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
904 sizeof(struct obd_device),
905 0, 0, 0, sizeof(struct obd_device), NULL);
906 if (!obd_device_cachep)
907 GOTO(out, rc = -ENOMEM);
911 obd_cleanup_caches();
915 static struct portals_handle_ops export_handle_ops;
917 /* map connection to client */
918 struct obd_export *class_conn2export(struct lustre_handle *conn)
920 struct obd_export *export;
924 CDEBUG(D_CACHE, "looking for null handle\n");
928 if (conn->cookie == -1) { /* this means assign a new connection */
929 CDEBUG(D_CACHE, "want a new connection\n");
933 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
934 export = class_handle2object(conn->cookie, &export_handle_ops);
937 EXPORT_SYMBOL(class_conn2export);
939 struct obd_device *class_exp2obd(struct obd_export *exp)
945 EXPORT_SYMBOL(class_exp2obd);
947 struct obd_import *class_exp2cliimp(struct obd_export *exp)
949 struct obd_device *obd = exp->exp_obd;
952 return obd->u.cli.cl_import;
954 EXPORT_SYMBOL(class_exp2cliimp);
956 /* Export management functions */
957 static void class_export_destroy(struct obd_export *exp)
959 struct obd_device *obd = exp->exp_obd;
962 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
963 LASSERT(obd != NULL);
965 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
966 exp->exp_client_uuid.uuid, obd->obd_name);
968 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
969 if (exp->exp_connection)
970 ptlrpc_put_connection_superhack(exp->exp_connection);
972 LASSERT(list_empty(&exp->exp_outstanding_replies));
973 LASSERT(list_empty(&exp->exp_uncommitted_replies));
974 LASSERT(list_empty(&exp->exp_req_replay_queue));
975 LASSERT(list_empty(&exp->exp_hp_rpcs));
976 obd_destroy_export(exp);
977 /* self export doesn't hold a reference to an obd, although it
978 * exists until freeing of the obd */
979 if (exp != obd->obd_self_export)
980 class_decref(obd, "export", exp);
982 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
986 static void export_handle_addref(void *export)
988 class_export_get(export);
991 static struct portals_handle_ops export_handle_ops = {
992 .hop_addref = export_handle_addref,
996 struct obd_export *class_export_get(struct obd_export *exp)
998 atomic_inc(&exp->exp_refcount);
999 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1000 atomic_read(&exp->exp_refcount));
1003 EXPORT_SYMBOL(class_export_get);
1005 void class_export_put(struct obd_export *exp)
1007 LASSERT(exp != NULL);
1008 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1009 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1010 atomic_read(&exp->exp_refcount) - 1);
1012 if (atomic_dec_and_test(&exp->exp_refcount)) {
1013 struct obd_device *obd = exp->exp_obd;
1015 CDEBUG(D_IOCTL, "final put %p/%s\n",
1016 exp, exp->exp_client_uuid.uuid);
1018 /* release nid stat refererence */
1019 lprocfs_exp_cleanup(exp);
1021 if (exp == obd->obd_self_export) {
1022 /* self export should be destroyed without
1023 * zombie thread as it doesn't hold a
1024 * reference to obd and doesn't hold any
1026 class_export_destroy(exp);
1027 /* self export is destroyed, no class
1028 * references exist and it is safe to free
1030 class_free_dev(obd);
1032 LASSERT(!list_empty(&exp->exp_obd_chain));
1033 obd_zombie_export_add(exp);
1038 EXPORT_SYMBOL(class_export_put);
1040 static void obd_zombie_exp_cull(struct work_struct *ws)
1042 struct obd_export *export;
1044 export = container_of(ws, struct obd_export, exp_zombie_work);
1045 class_export_destroy(export);
1048 /* Creates a new export, adds it to the hash table, and returns a
1049 * pointer to it. The refcount is 2: one for the hash reference, and
1050 * one for the pointer returned by this function. */
1051 struct obd_export *__class_new_export(struct obd_device *obd,
1052 struct obd_uuid *cluuid, bool is_self)
1054 struct obd_export *export;
1055 struct cfs_hash *hash = NULL;
1059 OBD_ALLOC_PTR(export);
1061 return ERR_PTR(-ENOMEM);
1063 export->exp_conn_cnt = 0;
1064 export->exp_lock_hash = NULL;
1065 export->exp_flock_hash = NULL;
1066 /* 2 = class_handle_hash + last */
1067 atomic_set(&export->exp_refcount, 2);
1068 atomic_set(&export->exp_rpc_count, 0);
1069 atomic_set(&export->exp_cb_count, 0);
1070 atomic_set(&export->exp_locks_count, 0);
1071 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1072 INIT_LIST_HEAD(&export->exp_locks_list);
1073 spin_lock_init(&export->exp_locks_list_guard);
1075 atomic_set(&export->exp_replay_count, 0);
1076 export->exp_obd = obd;
1077 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1078 spin_lock_init(&export->exp_uncommitted_replies_lock);
1079 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1080 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1081 INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1082 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1083 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1084 class_handle_hash(&export->exp_handle, &export_handle_ops);
1085 export->exp_last_request_time = ktime_get_real_seconds();
1086 spin_lock_init(&export->exp_lock);
1087 spin_lock_init(&export->exp_rpc_lock);
1088 INIT_HLIST_NODE(&export->exp_uuid_hash);
1089 INIT_HLIST_NODE(&export->exp_nid_hash);
1090 INIT_HLIST_NODE(&export->exp_gen_hash);
1091 spin_lock_init(&export->exp_bl_list_lock);
1092 INIT_LIST_HEAD(&export->exp_bl_list);
1093 INIT_LIST_HEAD(&export->exp_stale_list);
1094 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1096 export->exp_sp_peer = LUSTRE_SP_ANY;
1097 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1098 export->exp_client_uuid = *cluuid;
1099 obd_init_export(export);
1101 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1102 spin_lock(&obd->obd_dev_lock);
1103 /* shouldn't happen, but might race */
1104 if (obd->obd_stopping)
1105 GOTO(exit_unlock, rc = -ENODEV);
1107 hash = cfs_hash_getref(obd->obd_uuid_hash);
1109 GOTO(exit_unlock, rc = -ENODEV);
1110 spin_unlock(&obd->obd_dev_lock);
1112 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1114 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1115 obd->obd_name, cluuid->uuid, rc);
1116 GOTO(exit_err, rc = -EALREADY);
1120 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1121 spin_lock(&obd->obd_dev_lock);
1122 if (obd->obd_stopping) {
1124 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1125 GOTO(exit_unlock, rc = -ESHUTDOWN);
1129 class_incref(obd, "export", export);
1130 list_add_tail(&export->exp_obd_chain_timed,
1131 &obd->obd_exports_timed);
1132 list_add(&export->exp_obd_chain, &obd->obd_exports);
1133 obd->obd_num_exports++;
1135 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1136 INIT_LIST_HEAD(&export->exp_obd_chain);
1138 spin_unlock(&obd->obd_dev_lock);
1140 cfs_hash_putref(hash);
1144 spin_unlock(&obd->obd_dev_lock);
1147 cfs_hash_putref(hash);
1148 class_handle_unhash(&export->exp_handle);
1149 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1150 obd_destroy_export(export);
1151 OBD_FREE_PTR(export);
1155 struct obd_export *class_new_export(struct obd_device *obd,
1156 struct obd_uuid *uuid)
1158 return __class_new_export(obd, uuid, false);
1160 EXPORT_SYMBOL(class_new_export);
1162 struct obd_export *class_new_export_self(struct obd_device *obd,
1163 struct obd_uuid *uuid)
1165 return __class_new_export(obd, uuid, true);
1168 void class_unlink_export(struct obd_export *exp)
1170 class_handle_unhash(&exp->exp_handle);
1172 if (exp->exp_obd->obd_self_export == exp) {
1173 class_export_put(exp);
1177 spin_lock(&exp->exp_obd->obd_dev_lock);
1178 /* delete an uuid-export hashitem from hashtables */
1179 if (!hlist_unhashed(&exp->exp_uuid_hash))
1180 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1181 &exp->exp_client_uuid,
1182 &exp->exp_uuid_hash);
1184 #ifdef HAVE_SERVER_SUPPORT
1185 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1186 struct tg_export_data *ted = &exp->exp_target_data;
1187 struct cfs_hash *hash;
1189 /* Because obd_gen_hash will not be released until
1190 * class_cleanup(), so hash should never be NULL here */
1191 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1192 LASSERT(hash != NULL);
1193 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1194 &exp->exp_gen_hash);
1195 cfs_hash_putref(hash);
1197 #endif /* HAVE_SERVER_SUPPORT */
1199 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1200 list_del_init(&exp->exp_obd_chain_timed);
1201 exp->exp_obd->obd_num_exports--;
1202 spin_unlock(&exp->exp_obd->obd_dev_lock);
1203 atomic_inc(&obd_stale_export_num);
1205 /* A reference is kept by obd_stale_exports list */
1206 obd_stale_export_put(exp);
1208 EXPORT_SYMBOL(class_unlink_export);
1210 /* Import management functions */
1211 static void obd_zombie_import_free(struct obd_import *imp)
1215 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1216 imp->imp_obd->obd_name);
1218 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1220 ptlrpc_put_connection_superhack(imp->imp_connection);
1222 while (!list_empty(&imp->imp_conn_list)) {
1223 struct obd_import_conn *imp_conn;
1225 imp_conn = list_entry(imp->imp_conn_list.next,
1226 struct obd_import_conn, oic_item);
1227 list_del_init(&imp_conn->oic_item);
1228 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1229 OBD_FREE(imp_conn, sizeof(*imp_conn));
1232 LASSERT(imp->imp_sec == NULL);
1233 class_decref(imp->imp_obd, "import", imp);
1238 struct obd_import *class_import_get(struct obd_import *import)
1240 atomic_inc(&import->imp_refcount);
1241 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1242 atomic_read(&import->imp_refcount),
1243 import->imp_obd->obd_name);
1246 EXPORT_SYMBOL(class_import_get);
1248 void class_import_put(struct obd_import *imp)
1252 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1254 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1255 atomic_read(&imp->imp_refcount) - 1,
1256 imp->imp_obd->obd_name);
1258 if (atomic_dec_and_test(&imp->imp_refcount)) {
1259 CDEBUG(D_INFO, "final put import %p\n", imp);
1260 obd_zombie_import_add(imp);
1263 /* catch possible import put race */
1264 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1267 EXPORT_SYMBOL(class_import_put);
1269 static void init_imp_at(struct imp_at *at) {
1271 at_init(&at->iat_net_latency, 0, 0);
1272 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1273 /* max service estimates are tracked on the server side, so
1274 don't use the AT history here, just use the last reported
1275 val. (But keep hist for proc histogram, worst_ever) */
1276 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1281 static void obd_zombie_imp_cull(struct work_struct *ws)
1283 struct obd_import *import;
1285 import = container_of(ws, struct obd_import, imp_zombie_work);
1286 obd_zombie_import_free(import);
1289 struct obd_import *class_new_import(struct obd_device *obd)
1291 struct obd_import *imp;
1292 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1294 OBD_ALLOC(imp, sizeof(*imp));
1298 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1299 INIT_LIST_HEAD(&imp->imp_replay_list);
1300 INIT_LIST_HEAD(&imp->imp_sending_list);
1301 INIT_LIST_HEAD(&imp->imp_delayed_list);
1302 INIT_LIST_HEAD(&imp->imp_committed_list);
1303 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1304 imp->imp_known_replied_xid = 0;
1305 imp->imp_replay_cursor = &imp->imp_committed_list;
1306 spin_lock_init(&imp->imp_lock);
1307 imp->imp_last_success_conn = 0;
1308 imp->imp_state = LUSTRE_IMP_NEW;
1309 imp->imp_obd = class_incref(obd, "import", imp);
1310 rwlock_init(&imp->imp_sec_lock);
1311 init_waitqueue_head(&imp->imp_recovery_waitq);
1312 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1314 if (curr_pid_ns->child_reaper)
1315 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1317 imp->imp_sec_refpid = 1;
1319 atomic_set(&imp->imp_refcount, 2);
1320 atomic_set(&imp->imp_unregistering, 0);
1321 atomic_set(&imp->imp_inflight, 0);
1322 atomic_set(&imp->imp_replay_inflight, 0);
1323 atomic_set(&imp->imp_inval_count, 0);
1324 INIT_LIST_HEAD(&imp->imp_conn_list);
1325 init_imp_at(&imp->imp_at);
1327 /* the default magic is V2, will be used in connect RPC, and
1328 * then adjusted according to the flags in request/reply. */
1329 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1333 EXPORT_SYMBOL(class_new_import);
1335 void class_destroy_import(struct obd_import *import)
1337 LASSERT(import != NULL);
1338 LASSERT(import != LP_POISON);
1340 spin_lock(&import->imp_lock);
1341 import->imp_generation++;
1342 spin_unlock(&import->imp_lock);
1343 class_import_put(import);
1345 EXPORT_SYMBOL(class_destroy_import);
1347 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1349 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1351 spin_lock(&exp->exp_locks_list_guard);
1353 LASSERT(lock->l_exp_refs_nr >= 0);
1355 if (lock->l_exp_refs_target != NULL &&
1356 lock->l_exp_refs_target != exp) {
1357 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1358 exp, lock, lock->l_exp_refs_target);
1360 if ((lock->l_exp_refs_nr ++) == 0) {
1361 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1362 lock->l_exp_refs_target = exp;
1364 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1365 lock, exp, lock->l_exp_refs_nr);
1366 spin_unlock(&exp->exp_locks_list_guard);
1368 EXPORT_SYMBOL(__class_export_add_lock_ref);
1370 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1372 spin_lock(&exp->exp_locks_list_guard);
1373 LASSERT(lock->l_exp_refs_nr > 0);
1374 if (lock->l_exp_refs_target != exp) {
1375 LCONSOLE_WARN("lock %p, "
1376 "mismatching export pointers: %p, %p\n",
1377 lock, lock->l_exp_refs_target, exp);
1379 if (-- lock->l_exp_refs_nr == 0) {
1380 list_del_init(&lock->l_exp_refs_link);
1381 lock->l_exp_refs_target = NULL;
1383 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1384 lock, exp, lock->l_exp_refs_nr);
1385 spin_unlock(&exp->exp_locks_list_guard);
1387 EXPORT_SYMBOL(__class_export_del_lock_ref);
1390 /* A connection defines an export context in which preallocation can
1391 be managed. This releases the export pointer reference, and returns
1392 the export handle, so the export refcount is 1 when this function
1394 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1395 struct obd_uuid *cluuid)
1397 struct obd_export *export;
1398 LASSERT(conn != NULL);
1399 LASSERT(obd != NULL);
1400 LASSERT(cluuid != NULL);
1403 export = class_new_export(obd, cluuid);
1405 RETURN(PTR_ERR(export));
1407 conn->cookie = export->exp_handle.h_cookie;
1408 class_export_put(export);
1410 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1411 cluuid->uuid, conn->cookie);
1414 EXPORT_SYMBOL(class_connect);
1416 /* if export is involved in recovery then clean up related things */
1417 static void class_export_recovery_cleanup(struct obd_export *exp)
1419 struct obd_device *obd = exp->exp_obd;
1421 spin_lock(&obd->obd_recovery_task_lock);
1422 if (obd->obd_recovering) {
1423 if (exp->exp_in_recovery) {
1424 spin_lock(&exp->exp_lock);
1425 exp->exp_in_recovery = 0;
1426 spin_unlock(&exp->exp_lock);
1427 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1428 atomic_dec(&obd->obd_connected_clients);
1431 /* if called during recovery then should update
1432 * obd_stale_clients counter,
1433 * lightweight exports are not counted */
1434 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1435 exp->exp_obd->obd_stale_clients++;
1437 spin_unlock(&obd->obd_recovery_task_lock);
1439 spin_lock(&exp->exp_lock);
1440 /** Cleanup req replay fields */
1441 if (exp->exp_req_replay_needed) {
1442 exp->exp_req_replay_needed = 0;
1444 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1445 atomic_dec(&obd->obd_req_replay_clients);
1448 /** Cleanup lock replay data */
1449 if (exp->exp_lock_replay_needed) {
1450 exp->exp_lock_replay_needed = 0;
1452 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1453 atomic_dec(&obd->obd_lock_replay_clients);
1455 spin_unlock(&exp->exp_lock);
1458 /* This function removes 1-3 references from the export:
1459 * 1 - for export pointer passed
1460 * and if disconnect really need
1461 * 2 - removing from hash
1462 * 3 - in client_unlink_export
1463 * The export pointer passed to this function can destroyed */
1464 int class_disconnect(struct obd_export *export)
1466 int already_disconnected;
1469 if (export == NULL) {
1470 CWARN("attempting to free NULL export %p\n", export);
1474 spin_lock(&export->exp_lock);
1475 already_disconnected = export->exp_disconnected;
1476 export->exp_disconnected = 1;
1477 /* We hold references of export for uuid hash
1478 * and nid_hash and export link at least. So
1479 * it is safe to call cfs_hash_del in there. */
1480 if (!hlist_unhashed(&export->exp_nid_hash))
1481 cfs_hash_del(export->exp_obd->obd_nid_hash,
1482 &export->exp_connection->c_peer.nid,
1483 &export->exp_nid_hash);
1484 spin_unlock(&export->exp_lock);
1486 /* class_cleanup(), abort_recovery(), and class_fail_export()
1487 * all end up in here, and if any of them race we shouldn't
1488 * call extra class_export_puts(). */
1489 if (already_disconnected) {
1490 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1491 GOTO(no_disconn, already_disconnected);
1494 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1495 export->exp_handle.h_cookie);
1497 class_export_recovery_cleanup(export);
1498 class_unlink_export(export);
1500 class_export_put(export);
1503 EXPORT_SYMBOL(class_disconnect);
1505 /* Return non-zero for a fully connected export */
1506 int class_connected_export(struct obd_export *exp)
1511 spin_lock(&exp->exp_lock);
1512 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1513 spin_unlock(&exp->exp_lock);
1517 EXPORT_SYMBOL(class_connected_export);
1519 static void class_disconnect_export_list(struct list_head *list,
1520 enum obd_option flags)
1523 struct obd_export *exp;
1526 /* It's possible that an export may disconnect itself, but
1527 * nothing else will be added to this list. */
1528 while (!list_empty(list)) {
1529 exp = list_entry(list->next, struct obd_export,
1531 /* need for safe call CDEBUG after obd_disconnect */
1532 class_export_get(exp);
1534 spin_lock(&exp->exp_lock);
1535 exp->exp_flags = flags;
1536 spin_unlock(&exp->exp_lock);
1538 if (obd_uuid_equals(&exp->exp_client_uuid,
1539 &exp->exp_obd->obd_uuid)) {
1541 "exp %p export uuid == obd uuid, don't discon\n",
1543 /* Need to delete this now so we don't end up pointing
1544 * to work_list later when this export is cleaned up. */
1545 list_del_init(&exp->exp_obd_chain);
1546 class_export_put(exp);
1550 class_export_get(exp);
1551 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1552 "last request at %lld\n",
1553 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1554 exp, exp->exp_last_request_time);
1555 /* release one export reference anyway */
1556 rc = obd_disconnect(exp);
1558 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1559 obd_export_nid2str(exp), exp, rc);
1560 class_export_put(exp);
1565 void class_disconnect_exports(struct obd_device *obd)
1567 struct list_head work_list;
1570 /* Move all of the exports from obd_exports to a work list, en masse. */
1571 INIT_LIST_HEAD(&work_list);
1572 spin_lock(&obd->obd_dev_lock);
1573 list_splice_init(&obd->obd_exports, &work_list);
1574 list_splice_init(&obd->obd_delayed_exports, &work_list);
1575 spin_unlock(&obd->obd_dev_lock);
1577 if (!list_empty(&work_list)) {
1578 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1579 "disconnecting them\n", obd->obd_minor, obd);
1580 class_disconnect_export_list(&work_list,
1581 exp_flags_from_obd(obd));
1583 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1584 obd->obd_minor, obd);
1587 EXPORT_SYMBOL(class_disconnect_exports);
1589 /* Remove exports that have not completed recovery.
1591 void class_disconnect_stale_exports(struct obd_device *obd,
1592 int (*test_export)(struct obd_export *))
1594 struct list_head work_list;
1595 struct obd_export *exp, *n;
1599 INIT_LIST_HEAD(&work_list);
1600 spin_lock(&obd->obd_dev_lock);
1601 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1603 /* don't count self-export as client */
1604 if (obd_uuid_equals(&exp->exp_client_uuid,
1605 &exp->exp_obd->obd_uuid))
1608 /* don't evict clients which have no slot in last_rcvd
1609 * (e.g. lightweight connection) */
1610 if (exp->exp_target_data.ted_lr_idx == -1)
1613 spin_lock(&exp->exp_lock);
1614 if (exp->exp_failed || test_export(exp)) {
1615 spin_unlock(&exp->exp_lock);
1618 exp->exp_failed = 1;
1619 spin_unlock(&exp->exp_lock);
1621 list_move(&exp->exp_obd_chain, &work_list);
1623 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1624 obd->obd_name, exp->exp_client_uuid.uuid,
1625 obd_export_nid2str(exp));
1626 print_export_data(exp, "EVICTING", 0, D_HA);
1628 spin_unlock(&obd->obd_dev_lock);
1631 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1632 obd->obd_name, evicted);
1634 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1635 OBD_OPT_ABORT_RECOV);
1638 EXPORT_SYMBOL(class_disconnect_stale_exports);
1640 void class_fail_export(struct obd_export *exp)
1642 int rc, already_failed;
1644 spin_lock(&exp->exp_lock);
1645 already_failed = exp->exp_failed;
1646 exp->exp_failed = 1;
1647 spin_unlock(&exp->exp_lock);
1649 if (already_failed) {
1650 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1651 exp, exp->exp_client_uuid.uuid);
1655 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1656 exp, exp->exp_client_uuid.uuid);
1658 if (obd_dump_on_timeout)
1659 libcfs_debug_dumplog();
1661 /* need for safe call CDEBUG after obd_disconnect */
1662 class_export_get(exp);
1664 /* Most callers into obd_disconnect are removing their own reference
1665 * (request, for example) in addition to the one from the hash table.
1666 * We don't have such a reference here, so make one. */
1667 class_export_get(exp);
1668 rc = obd_disconnect(exp);
1670 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1672 CDEBUG(D_HA, "disconnected export %p/%s\n",
1673 exp, exp->exp_client_uuid.uuid);
1674 class_export_put(exp);
1676 EXPORT_SYMBOL(class_fail_export);
1678 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1680 struct cfs_hash *nid_hash;
1681 struct obd_export *doomed_exp = NULL;
1682 int exports_evicted = 0;
1684 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1686 spin_lock(&obd->obd_dev_lock);
1687 /* umount has run already, so evict thread should leave
1688 * its task to umount thread now */
1689 if (obd->obd_stopping) {
1690 spin_unlock(&obd->obd_dev_lock);
1691 return exports_evicted;
1693 nid_hash = obd->obd_nid_hash;
1694 cfs_hash_getref(nid_hash);
1695 spin_unlock(&obd->obd_dev_lock);
1698 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1699 if (doomed_exp == NULL)
1702 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1703 "nid %s found, wanted nid %s, requested nid %s\n",
1704 obd_export_nid2str(doomed_exp),
1705 libcfs_nid2str(nid_key), nid);
1706 LASSERTF(doomed_exp != obd->obd_self_export,
1707 "self-export is hashed by NID?\n");
1709 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1710 "request\n", obd->obd_name,
1711 obd_uuid2str(&doomed_exp->exp_client_uuid),
1712 obd_export_nid2str(doomed_exp));
1713 class_fail_export(doomed_exp);
1714 class_export_put(doomed_exp);
1717 cfs_hash_putref(nid_hash);
1719 if (!exports_evicted)
1720 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1721 obd->obd_name, nid);
1722 return exports_evicted;
1724 EXPORT_SYMBOL(obd_export_evict_by_nid);
1726 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1728 struct cfs_hash *uuid_hash;
1729 struct obd_export *doomed_exp = NULL;
1730 struct obd_uuid doomed_uuid;
1731 int exports_evicted = 0;
1733 spin_lock(&obd->obd_dev_lock);
1734 if (obd->obd_stopping) {
1735 spin_unlock(&obd->obd_dev_lock);
1736 return exports_evicted;
1738 uuid_hash = obd->obd_uuid_hash;
1739 cfs_hash_getref(uuid_hash);
1740 spin_unlock(&obd->obd_dev_lock);
1742 obd_str2uuid(&doomed_uuid, uuid);
1743 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1744 CERROR("%s: can't evict myself\n", obd->obd_name);
1745 cfs_hash_putref(uuid_hash);
1746 return exports_evicted;
1749 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1751 if (doomed_exp == NULL) {
1752 CERROR("%s: can't disconnect %s: no exports found\n",
1753 obd->obd_name, uuid);
1755 CWARN("%s: evicting %s at adminstrative request\n",
1756 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1757 class_fail_export(doomed_exp);
1758 class_export_put(doomed_exp);
1761 cfs_hash_putref(uuid_hash);
1763 return exports_evicted;
1766 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1767 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1768 EXPORT_SYMBOL(class_export_dump_hook);
1771 static void print_export_data(struct obd_export *exp, const char *status,
1772 int locks, int debug_level)
1774 struct ptlrpc_reply_state *rs;
1775 struct ptlrpc_reply_state *first_reply = NULL;
1778 spin_lock(&exp->exp_lock);
1779 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1785 spin_unlock(&exp->exp_lock);
1787 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1788 "%p %s %llu stale:%d\n",
1789 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1790 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1791 atomic_read(&exp->exp_rpc_count),
1792 atomic_read(&exp->exp_cb_count),
1793 atomic_read(&exp->exp_locks_count),
1794 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1795 nreplies, first_reply, nreplies > 3 ? "..." : "",
1796 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1797 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1798 if (locks && class_export_dump_hook != NULL)
1799 class_export_dump_hook(exp);
1803 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1805 struct obd_export *exp;
1807 spin_lock(&obd->obd_dev_lock);
1808 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1809 print_export_data(exp, "ACTIVE", locks, debug_level);
1810 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1811 print_export_data(exp, "UNLINKED", locks, debug_level);
1812 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1813 print_export_data(exp, "DELAYED", locks, debug_level);
1814 spin_unlock(&obd->obd_dev_lock);
1817 void obd_exports_barrier(struct obd_device *obd)
1820 LASSERT(list_empty(&obd->obd_exports));
1821 spin_lock(&obd->obd_dev_lock);
1822 while (!list_empty(&obd->obd_unlinked_exports)) {
1823 spin_unlock(&obd->obd_dev_lock);
1824 set_current_state(TASK_UNINTERRUPTIBLE);
1825 schedule_timeout(cfs_time_seconds(waited));
1826 if (waited > 5 && is_power_of_2(waited)) {
1827 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1828 "more than %d seconds. "
1829 "The obd refcount = %d. Is it stuck?\n",
1830 obd->obd_name, waited,
1831 atomic_read(&obd->obd_refcount));
1832 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1835 spin_lock(&obd->obd_dev_lock);
1837 spin_unlock(&obd->obd_dev_lock);
1839 EXPORT_SYMBOL(obd_exports_barrier);
1842 * Add export to the obd_zombe thread and notify it.
1844 static void obd_zombie_export_add(struct obd_export *exp) {
1845 atomic_dec(&obd_stale_export_num);
1846 spin_lock(&exp->exp_obd->obd_dev_lock);
1847 LASSERT(!list_empty(&exp->exp_obd_chain));
1848 list_del_init(&exp->exp_obd_chain);
1849 spin_unlock(&exp->exp_obd->obd_dev_lock);
1851 queue_work(zombie_wq, &exp->exp_zombie_work);
1855 * Add import to the obd_zombe thread and notify it.
1857 static void obd_zombie_import_add(struct obd_import *imp) {
1858 LASSERT(imp->imp_sec == NULL);
1860 queue_work(zombie_wq, &imp->imp_zombie_work);
1864 * wait when obd_zombie import/export queues become empty
1866 void obd_zombie_barrier(void)
1868 flush_workqueue(zombie_wq);
1870 EXPORT_SYMBOL(obd_zombie_barrier);
1873 struct obd_export *obd_stale_export_get(void)
1875 struct obd_export *exp = NULL;
1878 spin_lock(&obd_stale_export_lock);
1879 if (!list_empty(&obd_stale_exports)) {
1880 exp = list_entry(obd_stale_exports.next,
1881 struct obd_export, exp_stale_list);
1882 list_del_init(&exp->exp_stale_list);
1884 spin_unlock(&obd_stale_export_lock);
1887 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1888 atomic_read(&obd_stale_export_num));
1892 EXPORT_SYMBOL(obd_stale_export_get);
1894 void obd_stale_export_put(struct obd_export *exp)
1898 LASSERT(list_empty(&exp->exp_stale_list));
1899 if (exp->exp_lock_hash &&
1900 atomic_read(&exp->exp_lock_hash->hs_count)) {
1901 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1902 atomic_read(&obd_stale_export_num));
1904 spin_lock_bh(&exp->exp_bl_list_lock);
1905 spin_lock(&obd_stale_export_lock);
1906 /* Add to the tail if there is no blocked locks,
1907 * to the head otherwise. */
1908 if (list_empty(&exp->exp_bl_list))
1909 list_add_tail(&exp->exp_stale_list,
1910 &obd_stale_exports);
1912 list_add(&exp->exp_stale_list,
1913 &obd_stale_exports);
1915 spin_unlock(&obd_stale_export_lock);
1916 spin_unlock_bh(&exp->exp_bl_list_lock);
1918 class_export_put(exp);
1922 EXPORT_SYMBOL(obd_stale_export_put);
1925 * Adjust the position of the export in the stale list,
1926 * i.e. move to the head of the list if is needed.
1928 void obd_stale_export_adjust(struct obd_export *exp)
1930 LASSERT(exp != NULL);
1931 spin_lock_bh(&exp->exp_bl_list_lock);
1932 spin_lock(&obd_stale_export_lock);
1934 if (!list_empty(&exp->exp_stale_list) &&
1935 !list_empty(&exp->exp_bl_list))
1936 list_move(&exp->exp_stale_list, &obd_stale_exports);
1938 spin_unlock(&obd_stale_export_lock);
1939 spin_unlock_bh(&exp->exp_bl_list_lock);
1941 EXPORT_SYMBOL(obd_stale_export_adjust);
1944 * start destroy zombie import/export thread
1946 int obd_zombie_impexp_init(void)
1948 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1956 * stop destroy zombie import/export thread
1958 void obd_zombie_impexp_stop(void)
1960 destroy_workqueue(zombie_wq);
1961 LASSERT(list_empty(&obd_stale_exports));
1964 /***** Kernel-userspace comm helpers *******/
1966 /* Get length of entire message, including header */
1967 int kuc_len(int payload_len)
1969 return sizeof(struct kuc_hdr) + payload_len;
1971 EXPORT_SYMBOL(kuc_len);
1973 /* Get a pointer to kuc header, given a ptr to the payload
1974 * @param p Pointer to payload area
1975 * @returns Pointer to kuc header
1977 struct kuc_hdr * kuc_ptr(void *p)
1979 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1980 LASSERT(lh->kuc_magic == KUC_MAGIC);
1983 EXPORT_SYMBOL(kuc_ptr);
1985 /* Alloc space for a message, and fill in header
1986 * @return Pointer to payload area
1988 void *kuc_alloc(int payload_len, int transport, int type)
1991 int len = kuc_len(payload_len);
1995 return ERR_PTR(-ENOMEM);
1997 lh->kuc_magic = KUC_MAGIC;
1998 lh->kuc_transport = transport;
1999 lh->kuc_msgtype = type;
2000 lh->kuc_msglen = len;
2002 return (void *)(lh + 1);
2004 EXPORT_SYMBOL(kuc_alloc);
2006 /* Takes pointer to payload area */
2007 void kuc_free(void *p, int payload_len)
2009 struct kuc_hdr *lh = kuc_ptr(p);
2010 OBD_FREE(lh, kuc_len(payload_len));
2012 EXPORT_SYMBOL(kuc_free);
2014 struct obd_request_slot_waiter {
2015 struct list_head orsw_entry;
2016 wait_queue_head_t orsw_waitq;
2020 static bool obd_request_slot_avail(struct client_obd *cli,
2021 struct obd_request_slot_waiter *orsw)
2025 spin_lock(&cli->cl_loi_list_lock);
2026 avail = !!list_empty(&orsw->orsw_entry);
2027 spin_unlock(&cli->cl_loi_list_lock);
2033 * For network flow control, the RPC sponsor needs to acquire a credit
2034 * before sending the RPC. The credits count for a connection is defined
2035 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2036 * the subsequent RPC sponsors need to wait until others released their
2037 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2039 int obd_get_request_slot(struct client_obd *cli)
2041 struct obd_request_slot_waiter orsw;
2042 struct l_wait_info lwi;
2045 spin_lock(&cli->cl_loi_list_lock);
2046 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2047 cli->cl_rpcs_in_flight++;
2048 spin_unlock(&cli->cl_loi_list_lock);
2052 init_waitqueue_head(&orsw.orsw_waitq);
2053 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2054 orsw.orsw_signaled = false;
2055 spin_unlock(&cli->cl_loi_list_lock);
2057 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2058 rc = l_wait_event(orsw.orsw_waitq,
2059 obd_request_slot_avail(cli, &orsw) ||
2063 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2064 * freed but other (such as obd_put_request_slot) is using it. */
2065 spin_lock(&cli->cl_loi_list_lock);
2067 if (!orsw.orsw_signaled) {
2068 if (list_empty(&orsw.orsw_entry))
2069 cli->cl_rpcs_in_flight--;
2071 list_del(&orsw.orsw_entry);
2075 if (orsw.orsw_signaled) {
2076 LASSERT(list_empty(&orsw.orsw_entry));
2080 spin_unlock(&cli->cl_loi_list_lock);
2084 EXPORT_SYMBOL(obd_get_request_slot);
2086 void obd_put_request_slot(struct client_obd *cli)
2088 struct obd_request_slot_waiter *orsw;
2090 spin_lock(&cli->cl_loi_list_lock);
2091 cli->cl_rpcs_in_flight--;
2093 /* If there is free slot, wakeup the first waiter. */
2094 if (!list_empty(&cli->cl_flight_waiters) &&
2095 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2096 orsw = list_entry(cli->cl_flight_waiters.next,
2097 struct obd_request_slot_waiter, orsw_entry);
2098 list_del_init(&orsw->orsw_entry);
2099 cli->cl_rpcs_in_flight++;
2100 wake_up(&orsw->orsw_waitq);
2102 spin_unlock(&cli->cl_loi_list_lock);
2104 EXPORT_SYMBOL(obd_put_request_slot);
2106 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2108 return cli->cl_max_rpcs_in_flight;
2110 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2112 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2114 struct obd_request_slot_waiter *orsw;
2118 const char *type_name;
2121 if (max > OBD_MAX_RIF_MAX || max < 1)
2124 type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2125 if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2126 /* adjust max_mod_rpcs_in_flight to ensure it is always
2127 * strictly lower that max_rpcs_in_flight */
2129 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2130 "because it must be higher than "
2131 "max_mod_rpcs_in_flight value",
2132 cli->cl_import->imp_obd->obd_name);
2135 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2136 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2142 spin_lock(&cli->cl_loi_list_lock);
2143 old = cli->cl_max_rpcs_in_flight;
2144 cli->cl_max_rpcs_in_flight = max;
2145 client_adjust_max_dirty(cli);
2149 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2150 for (i = 0; i < diff; i++) {
2151 if (list_empty(&cli->cl_flight_waiters))
2154 orsw = list_entry(cli->cl_flight_waiters.next,
2155 struct obd_request_slot_waiter, orsw_entry);
2156 list_del_init(&orsw->orsw_entry);
2157 cli->cl_rpcs_in_flight++;
2158 wake_up(&orsw->orsw_waitq);
2160 spin_unlock(&cli->cl_loi_list_lock);
2164 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2166 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2168 return cli->cl_max_mod_rpcs_in_flight;
2170 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2172 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2174 struct obd_connect_data *ocd;
2178 if (max > OBD_MAX_RIF_MAX || max < 1)
2181 /* cannot exceed or equal max_rpcs_in_flight */
2182 if (max >= cli->cl_max_rpcs_in_flight) {
2183 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2184 "higher or equal to max_rpcs_in_flight value (%u)\n",
2185 cli->cl_import->imp_obd->obd_name,
2186 max, cli->cl_max_rpcs_in_flight);
2190 /* cannot exceed max modify RPCs in flight supported by the server */
2191 ocd = &cli->cl_import->imp_connect_data;
2192 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2193 maxmodrpcs = ocd->ocd_maxmodrpcs;
2196 if (max > maxmodrpcs) {
2197 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2198 "higher than max_mod_rpcs_per_client value (%hu) "
2199 "returned by the server at connection\n",
2200 cli->cl_import->imp_obd->obd_name,
2205 spin_lock(&cli->cl_mod_rpcs_lock);
2207 prev = cli->cl_max_mod_rpcs_in_flight;
2208 cli->cl_max_mod_rpcs_in_flight = max;
2210 /* wakeup waiters if limit has been increased */
2211 if (cli->cl_max_mod_rpcs_in_flight > prev)
2212 wake_up(&cli->cl_mod_rpcs_waitq);
2214 spin_unlock(&cli->cl_mod_rpcs_lock);
2218 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2220 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2221 struct seq_file *seq)
2223 unsigned long mod_tot = 0, mod_cum;
2224 struct timespec64 now;
2227 ktime_get_real_ts64(&now);
2229 spin_lock(&cli->cl_mod_rpcs_lock);
2231 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2232 (s64)now.tv_sec, now.tv_nsec);
2233 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2234 cli->cl_mod_rpcs_in_flight);
2236 seq_printf(seq, "\n\t\t\tmodify\n");
2237 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2239 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2242 for (i = 0; i < OBD_HIST_MAX; i++) {
2243 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2245 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2246 i, mod, pct(mod, mod_tot),
2247 pct(mod_cum, mod_tot));
2248 if (mod_cum == mod_tot)
2252 spin_unlock(&cli->cl_mod_rpcs_lock);
2256 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2258 /* The number of modify RPCs sent in parallel is limited
2259 * because the server has a finite number of slots per client to
2260 * store request result and ensure reply reconstruction when needed.
2261 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2262 * that takes into account server limit and cl_max_rpcs_in_flight
2264 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2265 * one close request is allowed above the maximum.
2267 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2272 /* A slot is available if
2273 * - number of modify RPCs in flight is less than the max
2274 * - it's a close RPC and no other close request is in flight
2276 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2277 (close_req && cli->cl_close_rpcs_in_flight == 0);
2282 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2287 spin_lock(&cli->cl_mod_rpcs_lock);
2288 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2289 spin_unlock(&cli->cl_mod_rpcs_lock);
2293 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2296 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2297 it->it_op == IT_READDIR ||
2298 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2303 /* Get a modify RPC slot from the obd client @cli according
2304 * to the kind of operation @opc that is going to be sent
2305 * and the intent @it of the operation if it applies.
2306 * If the maximum number of modify RPCs in flight is reached
2307 * the thread is put to sleep.
2308 * Returns the tag to be set in the request message. Tag 0
2309 * is reserved for non-modifying requests.
2311 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2312 struct lookup_intent *it)
2314 bool close_req = false;
2317 /* read-only metadata RPCs don't consume a slot on MDT
2318 * for reply reconstruction
2320 if (obd_skip_mod_rpc_slot(it))
2323 if (opc == MDS_CLOSE)
2327 spin_lock(&cli->cl_mod_rpcs_lock);
2328 max = cli->cl_max_mod_rpcs_in_flight;
2329 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2330 /* there is a slot available */
2331 cli->cl_mod_rpcs_in_flight++;
2333 cli->cl_close_rpcs_in_flight++;
2334 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2335 cli->cl_mod_rpcs_in_flight);
2336 /* find a free tag */
2337 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2339 LASSERT(i < OBD_MAX_RIF_MAX);
2340 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2341 spin_unlock(&cli->cl_mod_rpcs_lock);
2342 /* tag 0 is reserved for non-modify RPCs */
2344 CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
2345 "opc %u, max %hu\n",
2346 cli->cl_import->imp_obd->obd_name,
2351 spin_unlock(&cli->cl_mod_rpcs_lock);
2353 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2354 "opc %u, max %hu\n",
2355 cli->cl_import->imp_obd->obd_name, opc, max);
2357 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2358 obd_mod_rpc_slot_avail(cli,
2362 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2364 /* Put a modify RPC slot from the obd client @cli according
2365 * to the kind of operation @opc that has been sent and the
2366 * intent @it of the operation if it applies.
2368 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2369 struct lookup_intent *it, __u16 tag)
2371 bool close_req = false;
2373 if (obd_skip_mod_rpc_slot(it))
2376 if (opc == MDS_CLOSE)
2379 spin_lock(&cli->cl_mod_rpcs_lock);
2380 cli->cl_mod_rpcs_in_flight--;
2382 cli->cl_close_rpcs_in_flight--;
2383 /* release the tag in the bitmap */
2384 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2385 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2386 spin_unlock(&cli->cl_mod_rpcs_lock);
2387 wake_up(&cli->cl_mod_rpcs_waitq);
2389 EXPORT_SYMBOL(obd_put_mod_rpc_slot);