4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59 const char *status, int locks, int debug_level);
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90 obd, obd->obd_namespace, obd->obd_force);
93 lu_ref_fini(&obd->obd_reference);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 struct obd_type *class_search_type(const char *name)
99 struct kobject *kobj = kset_find_obj(lustre_kset, name);
101 if (kobj && kobj->ktype == &class_ktype)
102 return container_of(kobj, struct obd_type, typ_kobj);
107 EXPORT_SYMBOL(class_search_type);
109 struct obd_type *class_get_type(const char *name)
111 struct obd_type *type;
113 type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
116 const char *modname = name;
118 #ifdef HAVE_SERVER_SUPPORT
119 if (strcmp(modname, "obdfilter") == 0)
122 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123 modname = LUSTRE_OSP_NAME;
125 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126 modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 if (try_module_get(type->typ_dt_ops->o_owner)) {
140 atomic_inc(&type->typ_refcnt);
141 /* class_search_type() returned a counted reference,
142 * but we don't need that count any more as
143 * we have one through typ_refcnt.
145 kobject_put(&type->typ_kobj);
147 kobject_put(&type->typ_kobj);
154 void class_put_type(struct obd_type *type)
157 module_put(type->typ_dt_ops->o_owner);
158 atomic_dec(&type->typ_refcnt);
161 static void class_sysfs_release(struct kobject *kobj)
163 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
165 debugfs_remove_recursive(type->typ_debugfs_entry);
166 type->typ_debugfs_entry = NULL;
169 lu_device_type_fini(type->typ_lu);
171 #ifdef CONFIG_PROC_FS
172 if (type->typ_name && type->typ_procroot)
173 remove_proc_subtree(type->typ_name, proc_lustre_root);
175 if (type->typ_md_ops)
176 OBD_FREE_PTR(type->typ_md_ops);
177 if (type->typ_dt_ops)
178 OBD_FREE_PTR(type->typ_dt_ops);
180 OBD_FREE(type, sizeof(*type));
183 static struct kobj_type class_ktype = {
184 .sysfs_ops = &lustre_sysfs_ops,
185 .release = class_sysfs_release,
188 #ifdef HAVE_SERVER_SUPPORT
189 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
191 struct dentry *symlink;
192 struct obd_type *type;
195 type = class_search_type(name);
197 kobject_put(&type->typ_kobj);
198 return ERR_PTR(-EEXIST);
201 OBD_ALLOC(type, sizeof(*type));
203 return ERR_PTR(-ENOMEM);
205 type->typ_kobj.kset = lustre_kset;
206 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
207 &lustre_kset->kobj, "%s", name);
211 symlink = debugfs_create_dir(name, debugfs_lustre_root);
212 if (IS_ERR_OR_NULL(symlink)) {
213 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
214 kobject_put(&type->typ_kobj);
217 type->typ_debugfs_entry = symlink;
218 type->typ_sym_filter = true;
221 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
223 if (IS_ERR(type->typ_procroot)) {
224 CERROR("%s: can't create compat proc entry: %d\n",
225 name, (int)PTR_ERR(type->typ_procroot));
226 type->typ_procroot = NULL;
232 EXPORT_SYMBOL(class_add_symlinks);
233 #endif /* HAVE_SERVER_SUPPORT */
235 #define CLASS_MAX_NAME 1024
237 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
238 bool enable_proc, struct lprocfs_vars *vars,
239 const char *name, struct lu_device_type *ldt)
241 struct obd_type *type;
246 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
248 type = class_search_type(name);
250 #ifdef HAVE_SERVER_SUPPORT
251 if (type->typ_sym_filter)
253 #endif /* HAVE_SERVER_SUPPORT */
254 kobject_put(&type->typ_kobj);
255 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
259 OBD_ALLOC(type, sizeof(*type));
263 type->typ_kobj.kset = lustre_kset;
264 kobject_init(&type->typ_kobj, &class_ktype);
265 #ifdef HAVE_SERVER_SUPPORT
267 #endif /* HAVE_SERVER_SUPPORT */
268 OBD_ALLOC_PTR(type->typ_dt_ops);
269 OBD_ALLOC_PTR(type->typ_md_ops);
271 if (type->typ_dt_ops == NULL ||
272 type->typ_md_ops == NULL)
273 GOTO (failed, rc = -ENOMEM);
275 *(type->typ_dt_ops) = *dt_ops;
276 /* md_ops is optional */
278 *(type->typ_md_ops) = *md_ops;
280 #ifdef HAVE_SERVER_SUPPORT
281 if (type->typ_sym_filter) {
282 type->typ_sym_filter = false;
283 kobject_put(&type->typ_kobj);
287 #ifdef CONFIG_PROC_FS
288 if (enable_proc && !type->typ_procroot) {
289 type->typ_procroot = lprocfs_register(name,
292 if (IS_ERR(type->typ_procroot)) {
293 rc = PTR_ERR(type->typ_procroot);
294 type->typ_procroot = NULL;
299 type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
301 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
302 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
304 type->typ_debugfs_entry = NULL;
308 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
311 #ifdef HAVE_SERVER_SUPPORT
316 rc = lu_device_type_init(ldt);
324 kobject_put(&type->typ_kobj);
328 EXPORT_SYMBOL(class_register_type);
330 int class_unregister_type(const char *name)
332 struct obd_type *type = class_search_type(name);
337 CERROR("unknown obd type\n");
341 if (atomic_read(&type->typ_refcnt)) {
342 CERROR("type %s has refcount (%d)\n", name,
343 atomic_read(&type->typ_refcnt));
344 /* This is a bad situation, let's make the best of it */
345 /* Remove ops, but leave the name for debugging */
346 OBD_FREE_PTR(type->typ_dt_ops);
347 OBD_FREE_PTR(type->typ_md_ops);
348 GOTO(out_put, rc = -EBUSY);
351 /* Put the final ref */
352 kobject_put(&type->typ_kobj);
354 /* Put the ref returned by class_search_type() */
355 kobject_put(&type->typ_kobj);
358 } /* class_unregister_type */
359 EXPORT_SYMBOL(class_unregister_type);
362 * Create a new obd device.
364 * Allocate the new obd_device and initialize it.
366 * \param[in] type_name obd device type string.
367 * \param[in] name obd device name.
368 * \param[in] uuid obd device UUID
370 * \retval newdev pointer to created obd_device
371 * \retval ERR_PTR(errno) on error
373 struct obd_device *class_newdev(const char *type_name, const char *name,
376 struct obd_device *newdev;
377 struct obd_type *type = NULL;
380 if (strlen(name) >= MAX_OBD_NAME) {
381 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
382 RETURN(ERR_PTR(-EINVAL));
385 type = class_get_type(type_name);
387 CERROR("OBD: unknown type: %s\n", type_name);
388 RETURN(ERR_PTR(-ENODEV));
391 newdev = obd_device_alloc();
392 if (newdev == NULL) {
393 class_put_type(type);
394 RETURN(ERR_PTR(-ENOMEM));
396 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
397 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
398 newdev->obd_type = type;
399 newdev->obd_minor = -1;
401 rwlock_init(&newdev->obd_pool_lock);
402 newdev->obd_pool_limit = 0;
403 newdev->obd_pool_slv = 0;
405 INIT_LIST_HEAD(&newdev->obd_exports);
406 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
407 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
408 INIT_LIST_HEAD(&newdev->obd_exports_timed);
409 INIT_LIST_HEAD(&newdev->obd_nid_stats);
410 spin_lock_init(&newdev->obd_nid_lock);
411 spin_lock_init(&newdev->obd_dev_lock);
412 mutex_init(&newdev->obd_dev_mutex);
413 spin_lock_init(&newdev->obd_osfs_lock);
414 /* newdev->obd_osfs_age must be set to a value in the distant
415 * past to guarantee a fresh statfs is fetched on mount. */
416 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
418 /* XXX belongs in setup not attach */
419 init_rwsem(&newdev->obd_observer_link_sem);
421 spin_lock_init(&newdev->obd_recovery_task_lock);
422 init_waitqueue_head(&newdev->obd_next_transno_waitq);
423 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
424 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
425 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
426 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
427 INIT_LIST_HEAD(&newdev->obd_evict_list);
428 INIT_LIST_HEAD(&newdev->obd_lwp_list);
430 llog_group_init(&newdev->obd_olg);
431 /* Detach drops this */
432 atomic_set(&newdev->obd_refcount, 1);
433 lu_ref_init(&newdev->obd_reference);
434 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
436 newdev->obd_conn_inprogress = 0;
438 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
440 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
441 newdev->obd_name, newdev);
449 * \param[in] obd obd_device to be freed
453 void class_free_dev(struct obd_device *obd)
455 struct obd_type *obd_type = obd->obd_type;
457 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
458 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
459 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
460 "obd %p != obd_devs[%d] %p\n",
461 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
462 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
463 "obd_refcount should be 0, not %d\n",
464 atomic_read(&obd->obd_refcount));
465 LASSERT(obd_type != NULL);
467 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
468 obd->obd_name, obd->obd_type->typ_name);
470 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
471 obd->obd_name, obd->obd_uuid.uuid);
472 if (obd->obd_stopping) {
475 /* If we're not stopping, we were never set up */
476 err = obd_cleanup(obd);
478 CERROR("Cleanup %s returned %d\n",
482 obd_device_free(obd);
484 class_put_type(obd_type);
488 * Unregister obd device.
490 * Free slot in obd_dev[] used by \a obd.
492 * \param[in] new_obd obd_device to be unregistered
496 void class_unregister_device(struct obd_device *obd)
498 write_lock(&obd_dev_lock);
499 if (obd->obd_minor >= 0) {
500 LASSERT(obd_devs[obd->obd_minor] == obd);
501 obd_devs[obd->obd_minor] = NULL;
504 write_unlock(&obd_dev_lock);
508 * Register obd device.
510 * Find free slot in obd_devs[], fills it with \a new_obd.
512 * \param[in] new_obd obd_device to be registered
515 * \retval -EEXIST device with this name is registered
516 * \retval -EOVERFLOW obd_devs[] is full
518 int class_register_device(struct obd_device *new_obd)
522 int new_obd_minor = 0;
523 bool minor_assign = false;
524 bool retried = false;
527 write_lock(&obd_dev_lock);
528 for (i = 0; i < class_devno_max(); i++) {
529 struct obd_device *obd = class_num2obd(i);
532 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
535 write_unlock(&obd_dev_lock);
537 /* the obd_device could be waited to be
538 * destroyed by the "obd_zombie_impexp_thread".
540 obd_zombie_barrier();
545 CERROR("%s: already exists, won't add\n",
547 /* in case we found a free slot before duplicate */
548 minor_assign = false;
552 if (!minor_assign && obd == NULL) {
559 new_obd->obd_minor = new_obd_minor;
560 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
561 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
562 obd_devs[new_obd_minor] = new_obd;
566 CERROR("%s: all %u/%u devices used, increase "
567 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
568 i, class_devno_max(), ret);
571 write_unlock(&obd_dev_lock);
576 static int class_name2dev_nolock(const char *name)
583 for (i = 0; i < class_devno_max(); i++) {
584 struct obd_device *obd = class_num2obd(i);
586 if (obd && strcmp(name, obd->obd_name) == 0) {
587 /* Make sure we finished attaching before we give
588 out any references */
589 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
590 if (obd->obd_attached) {
600 int class_name2dev(const char *name)
607 read_lock(&obd_dev_lock);
608 i = class_name2dev_nolock(name);
609 read_unlock(&obd_dev_lock);
613 EXPORT_SYMBOL(class_name2dev);
615 struct obd_device *class_name2obd(const char *name)
617 int dev = class_name2dev(name);
619 if (dev < 0 || dev > class_devno_max())
621 return class_num2obd(dev);
623 EXPORT_SYMBOL(class_name2obd);
625 int class_uuid2dev_nolock(struct obd_uuid *uuid)
629 for (i = 0; i < class_devno_max(); i++) {
630 struct obd_device *obd = class_num2obd(i);
632 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
633 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
641 int class_uuid2dev(struct obd_uuid *uuid)
645 read_lock(&obd_dev_lock);
646 i = class_uuid2dev_nolock(uuid);
647 read_unlock(&obd_dev_lock);
651 EXPORT_SYMBOL(class_uuid2dev);
653 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
655 int dev = class_uuid2dev(uuid);
658 return class_num2obd(dev);
660 EXPORT_SYMBOL(class_uuid2obd);
663 * Get obd device from ::obd_devs[]
665 * \param num [in] array index
667 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
668 * otherwise return the obd device there.
670 struct obd_device *class_num2obd(int num)
672 struct obd_device *obd = NULL;
674 if (num < class_devno_max()) {
679 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
680 "%p obd_magic %08x != %08x\n",
681 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
682 LASSERTF(obd->obd_minor == num,
683 "%p obd_minor %0d != %0d\n",
684 obd, obd->obd_minor, num);
691 * Find obd in obd_dev[] by name or uuid.
693 * Increment obd's refcount if found.
695 * \param[in] str obd name or uuid
697 * \retval NULL if not found
698 * \retval target pointer to found obd_device
700 struct obd_device *class_dev_by_str(const char *str)
702 struct obd_device *target = NULL;
703 struct obd_uuid tgtuuid;
706 obd_str2uuid(&tgtuuid, str);
708 read_lock(&obd_dev_lock);
709 rc = class_uuid2dev_nolock(&tgtuuid);
711 rc = class_name2dev_nolock(str);
714 target = class_num2obd(rc);
717 class_incref(target, "find", current);
718 read_unlock(&obd_dev_lock);
722 EXPORT_SYMBOL(class_dev_by_str);
725 * Get obd devices count. Device in any
727 * \retval obd device count
729 int get_devices_count(void)
731 int index, max_index = class_devno_max(), dev_count = 0;
733 read_lock(&obd_dev_lock);
734 for (index = 0; index <= max_index; index++) {
735 struct obd_device *obd = class_num2obd(index);
739 read_unlock(&obd_dev_lock);
743 EXPORT_SYMBOL(get_devices_count);
745 void class_obd_list(void)
750 read_lock(&obd_dev_lock);
751 for (i = 0; i < class_devno_max(); i++) {
752 struct obd_device *obd = class_num2obd(i);
756 if (obd->obd_stopping)
758 else if (obd->obd_set_up)
760 else if (obd->obd_attached)
764 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
765 i, status, obd->obd_type->typ_name,
766 obd->obd_name, obd->obd_uuid.uuid,
767 atomic_read(&obd->obd_refcount));
769 read_unlock(&obd_dev_lock);
772 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
773 specified, then only the client with that uuid is returned,
774 otherwise any client connected to the tgt is returned. */
775 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
776 const char *type_name,
777 struct obd_uuid *grp_uuid)
781 read_lock(&obd_dev_lock);
782 for (i = 0; i < class_devno_max(); i++) {
783 struct obd_device *obd = class_num2obd(i);
787 if ((strncmp(obd->obd_type->typ_name, type_name,
788 strlen(type_name)) == 0)) {
789 if (obd_uuid_equals(tgt_uuid,
790 &obd->u.cli.cl_target_uuid) &&
791 ((grp_uuid)? obd_uuid_equals(grp_uuid,
792 &obd->obd_uuid) : 1)) {
793 read_unlock(&obd_dev_lock);
798 read_unlock(&obd_dev_lock);
802 EXPORT_SYMBOL(class_find_client_obd);
804 /* Iterate the obd_device list looking devices have grp_uuid. Start
805 searching at *next, and if a device is found, the next index to look
806 at is saved in *next. If next is NULL, then the first matching device
807 will always be returned. */
808 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
814 else if (*next >= 0 && *next < class_devno_max())
819 read_lock(&obd_dev_lock);
820 for (; i < class_devno_max(); i++) {
821 struct obd_device *obd = class_num2obd(i);
825 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
828 read_unlock(&obd_dev_lock);
832 read_unlock(&obd_dev_lock);
836 EXPORT_SYMBOL(class_devices_in_group);
839 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
840 * adjust sptlrpc settings accordingly.
842 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
844 struct obd_device *obd;
848 LASSERT(namelen > 0);
850 read_lock(&obd_dev_lock);
851 for (i = 0; i < class_devno_max(); i++) {
852 obd = class_num2obd(i);
854 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
857 /* only notify mdc, osc, osp, lwp, mdt, ost
858 * because only these have a -sptlrpc llog */
859 type = obd->obd_type->typ_name;
860 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
861 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
862 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
863 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
864 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
865 strcmp(type, LUSTRE_OST_NAME) != 0)
868 if (strncmp(obd->obd_name, fsname, namelen))
871 class_incref(obd, __FUNCTION__, obd);
872 read_unlock(&obd_dev_lock);
873 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
874 sizeof(KEY_SPTLRPC_CONF),
875 KEY_SPTLRPC_CONF, 0, NULL, NULL);
877 class_decref(obd, __FUNCTION__, obd);
878 read_lock(&obd_dev_lock);
880 read_unlock(&obd_dev_lock);
883 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
885 void obd_cleanup_caches(void)
888 if (obd_device_cachep) {
889 kmem_cache_destroy(obd_device_cachep);
890 obd_device_cachep = NULL;
896 int obd_init_caches(void)
901 LASSERT(obd_device_cachep == NULL);
902 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
903 sizeof(struct obd_device),
904 0, 0, 0, sizeof(struct obd_device), NULL);
905 if (!obd_device_cachep)
906 GOTO(out, rc = -ENOMEM);
910 obd_cleanup_caches();
914 static struct portals_handle_ops export_handle_ops;
916 /* map connection to client */
917 struct obd_export *class_conn2export(struct lustre_handle *conn)
919 struct obd_export *export;
923 CDEBUG(D_CACHE, "looking for null handle\n");
927 if (conn->cookie == -1) { /* this means assign a new connection */
928 CDEBUG(D_CACHE, "want a new connection\n");
932 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
933 export = class_handle2object(conn->cookie, &export_handle_ops);
936 EXPORT_SYMBOL(class_conn2export);
938 struct obd_device *class_exp2obd(struct obd_export *exp)
944 EXPORT_SYMBOL(class_exp2obd);
946 struct obd_import *class_exp2cliimp(struct obd_export *exp)
948 struct obd_device *obd = exp->exp_obd;
951 return obd->u.cli.cl_import;
953 EXPORT_SYMBOL(class_exp2cliimp);
955 /* Export management functions */
956 static void class_export_destroy(struct obd_export *exp)
958 struct obd_device *obd = exp->exp_obd;
961 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
962 LASSERT(obd != NULL);
964 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
965 exp->exp_client_uuid.uuid, obd->obd_name);
967 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
968 if (exp->exp_connection)
969 ptlrpc_put_connection_superhack(exp->exp_connection);
971 LASSERT(list_empty(&exp->exp_outstanding_replies));
972 LASSERT(list_empty(&exp->exp_uncommitted_replies));
973 LASSERT(list_empty(&exp->exp_req_replay_queue));
974 LASSERT(list_empty(&exp->exp_hp_rpcs));
975 obd_destroy_export(exp);
976 /* self export doesn't hold a reference to an obd, although it
977 * exists until freeing of the obd */
978 if (exp != obd->obd_self_export)
979 class_decref(obd, "export", exp);
981 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
985 static struct portals_handle_ops export_handle_ops = {
987 .hop_type = "export",
990 struct obd_export *class_export_get(struct obd_export *exp)
992 refcount_inc(&exp->exp_handle.h_ref);
993 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
994 refcount_read(&exp->exp_handle.h_ref));
997 EXPORT_SYMBOL(class_export_get);
999 void class_export_put(struct obd_export *exp)
1001 LASSERT(exp != NULL);
1002 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
1003 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
1004 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1005 refcount_read(&exp->exp_handle.h_ref) - 1);
1007 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
1008 struct obd_device *obd = exp->exp_obd;
1010 CDEBUG(D_IOCTL, "final put %p/%s\n",
1011 exp, exp->exp_client_uuid.uuid);
1013 /* release nid stat refererence */
1014 lprocfs_exp_cleanup(exp);
1016 if (exp == obd->obd_self_export) {
1017 /* self export should be destroyed without
1018 * zombie thread as it doesn't hold a
1019 * reference to obd and doesn't hold any
1021 class_export_destroy(exp);
1022 /* self export is destroyed, no class
1023 * references exist and it is safe to free
1025 class_free_dev(obd);
1027 LASSERT(!list_empty(&exp->exp_obd_chain));
1028 obd_zombie_export_add(exp);
1033 EXPORT_SYMBOL(class_export_put);
1035 static void obd_zombie_exp_cull(struct work_struct *ws)
1037 struct obd_export *export;
1039 export = container_of(ws, struct obd_export, exp_zombie_work);
1040 class_export_destroy(export);
1043 /* Creates a new export, adds it to the hash table, and returns a
1044 * pointer to it. The refcount is 2: one for the hash reference, and
1045 * one for the pointer returned by this function. */
1046 struct obd_export *__class_new_export(struct obd_device *obd,
1047 struct obd_uuid *cluuid, bool is_self)
1049 struct obd_export *export;
1050 struct cfs_hash *hash = NULL;
1054 OBD_ALLOC_PTR(export);
1056 return ERR_PTR(-ENOMEM);
1058 export->exp_conn_cnt = 0;
1059 export->exp_lock_hash = NULL;
1060 export->exp_flock_hash = NULL;
1061 /* 2 = class_handle_hash + last */
1062 refcount_set(&export->exp_handle.h_ref, 2);
1063 atomic_set(&export->exp_rpc_count, 0);
1064 atomic_set(&export->exp_cb_count, 0);
1065 atomic_set(&export->exp_locks_count, 0);
1066 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1067 INIT_LIST_HEAD(&export->exp_locks_list);
1068 spin_lock_init(&export->exp_locks_list_guard);
1070 atomic_set(&export->exp_replay_count, 0);
1071 export->exp_obd = obd;
1072 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1073 spin_lock_init(&export->exp_uncommitted_replies_lock);
1074 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1075 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1076 INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1077 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1078 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1079 class_handle_hash(&export->exp_handle, &export_handle_ops);
1080 export->exp_last_request_time = ktime_get_real_seconds();
1081 spin_lock_init(&export->exp_lock);
1082 spin_lock_init(&export->exp_rpc_lock);
1083 INIT_HLIST_NODE(&export->exp_uuid_hash);
1084 INIT_HLIST_NODE(&export->exp_nid_hash);
1085 INIT_HLIST_NODE(&export->exp_gen_hash);
1086 spin_lock_init(&export->exp_bl_list_lock);
1087 INIT_LIST_HEAD(&export->exp_bl_list);
1088 INIT_LIST_HEAD(&export->exp_stale_list);
1089 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1091 export->exp_sp_peer = LUSTRE_SP_ANY;
1092 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1093 export->exp_client_uuid = *cluuid;
1094 obd_init_export(export);
1096 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1097 spin_lock(&obd->obd_dev_lock);
1098 /* shouldn't happen, but might race */
1099 if (obd->obd_stopping)
1100 GOTO(exit_unlock, rc = -ENODEV);
1102 hash = cfs_hash_getref(obd->obd_uuid_hash);
1104 GOTO(exit_unlock, rc = -ENODEV);
1105 spin_unlock(&obd->obd_dev_lock);
1107 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1109 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1110 obd->obd_name, cluuid->uuid, rc);
1111 GOTO(exit_err, rc = -EALREADY);
1115 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1116 spin_lock(&obd->obd_dev_lock);
1117 if (obd->obd_stopping) {
1119 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1120 GOTO(exit_unlock, rc = -ESHUTDOWN);
1124 class_incref(obd, "export", export);
1125 list_add_tail(&export->exp_obd_chain_timed,
1126 &obd->obd_exports_timed);
1127 list_add(&export->exp_obd_chain, &obd->obd_exports);
1128 obd->obd_num_exports++;
1130 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1131 INIT_LIST_HEAD(&export->exp_obd_chain);
1133 spin_unlock(&obd->obd_dev_lock);
1135 cfs_hash_putref(hash);
1139 spin_unlock(&obd->obd_dev_lock);
1142 cfs_hash_putref(hash);
1143 class_handle_unhash(&export->exp_handle);
1144 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1145 obd_destroy_export(export);
1146 OBD_FREE_PTR(export);
1150 struct obd_export *class_new_export(struct obd_device *obd,
1151 struct obd_uuid *uuid)
1153 return __class_new_export(obd, uuid, false);
1155 EXPORT_SYMBOL(class_new_export);
1157 struct obd_export *class_new_export_self(struct obd_device *obd,
1158 struct obd_uuid *uuid)
1160 return __class_new_export(obd, uuid, true);
1163 void class_unlink_export(struct obd_export *exp)
1165 class_handle_unhash(&exp->exp_handle);
1167 if (exp->exp_obd->obd_self_export == exp) {
1168 class_export_put(exp);
1172 spin_lock(&exp->exp_obd->obd_dev_lock);
1173 /* delete an uuid-export hashitem from hashtables */
1174 if (!hlist_unhashed(&exp->exp_uuid_hash))
1175 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1176 &exp->exp_client_uuid,
1177 &exp->exp_uuid_hash);
1179 #ifdef HAVE_SERVER_SUPPORT
1180 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1181 struct tg_export_data *ted = &exp->exp_target_data;
1182 struct cfs_hash *hash;
1184 /* Because obd_gen_hash will not be released until
1185 * class_cleanup(), so hash should never be NULL here */
1186 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1187 LASSERT(hash != NULL);
1188 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1189 &exp->exp_gen_hash);
1190 cfs_hash_putref(hash);
1192 #endif /* HAVE_SERVER_SUPPORT */
1194 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1195 list_del_init(&exp->exp_obd_chain_timed);
1196 exp->exp_obd->obd_num_exports--;
1197 spin_unlock(&exp->exp_obd->obd_dev_lock);
1198 atomic_inc(&obd_stale_export_num);
1200 /* A reference is kept by obd_stale_exports list */
1201 obd_stale_export_put(exp);
1203 EXPORT_SYMBOL(class_unlink_export);
1205 /* Import management functions */
1206 static void obd_zombie_import_free(struct obd_import *imp)
1210 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1211 imp->imp_obd->obd_name);
1213 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1215 ptlrpc_put_connection_superhack(imp->imp_connection);
1217 while (!list_empty(&imp->imp_conn_list)) {
1218 struct obd_import_conn *imp_conn;
1220 imp_conn = list_entry(imp->imp_conn_list.next,
1221 struct obd_import_conn, oic_item);
1222 list_del_init(&imp_conn->oic_item);
1223 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1224 OBD_FREE(imp_conn, sizeof(*imp_conn));
1227 LASSERT(imp->imp_sec == NULL);
1228 class_decref(imp->imp_obd, "import", imp);
1233 struct obd_import *class_import_get(struct obd_import *import)
1235 atomic_inc(&import->imp_refcount);
1236 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1237 atomic_read(&import->imp_refcount),
1238 import->imp_obd->obd_name);
1241 EXPORT_SYMBOL(class_import_get);
1243 void class_import_put(struct obd_import *imp)
1247 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1249 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1250 atomic_read(&imp->imp_refcount) - 1,
1251 imp->imp_obd->obd_name);
1253 if (atomic_dec_and_test(&imp->imp_refcount)) {
1254 CDEBUG(D_INFO, "final put import %p\n", imp);
1255 obd_zombie_import_add(imp);
1260 EXPORT_SYMBOL(class_import_put);
1262 static void init_imp_at(struct imp_at *at) {
1264 at_init(&at->iat_net_latency, 0, 0);
1265 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1266 /* max service estimates are tracked on the server side, so
1267 don't use the AT history here, just use the last reported
1268 val. (But keep hist for proc histogram, worst_ever) */
1269 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1274 static void obd_zombie_imp_cull(struct work_struct *ws)
1276 struct obd_import *import;
1278 import = container_of(ws, struct obd_import, imp_zombie_work);
1279 obd_zombie_import_free(import);
1282 struct obd_import *class_new_import(struct obd_device *obd)
1284 struct obd_import *imp;
1285 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1287 OBD_ALLOC(imp, sizeof(*imp));
1291 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1292 INIT_LIST_HEAD(&imp->imp_replay_list);
1293 INIT_LIST_HEAD(&imp->imp_sending_list);
1294 INIT_LIST_HEAD(&imp->imp_delayed_list);
1295 INIT_LIST_HEAD(&imp->imp_committed_list);
1296 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1297 imp->imp_known_replied_xid = 0;
1298 imp->imp_replay_cursor = &imp->imp_committed_list;
1299 spin_lock_init(&imp->imp_lock);
1300 imp->imp_last_success_conn = 0;
1301 imp->imp_state = LUSTRE_IMP_NEW;
1302 imp->imp_obd = class_incref(obd, "import", imp);
1303 rwlock_init(&imp->imp_sec_lock);
1304 init_waitqueue_head(&imp->imp_recovery_waitq);
1305 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1307 if (curr_pid_ns->child_reaper)
1308 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1310 imp->imp_sec_refpid = 1;
1312 atomic_set(&imp->imp_refcount, 2);
1313 atomic_set(&imp->imp_unregistering, 0);
1314 atomic_set(&imp->imp_inflight, 0);
1315 atomic_set(&imp->imp_replay_inflight, 0);
1316 atomic_set(&imp->imp_inval_count, 0);
1317 INIT_LIST_HEAD(&imp->imp_conn_list);
1318 init_imp_at(&imp->imp_at);
1320 /* the default magic is V2, will be used in connect RPC, and
1321 * then adjusted according to the flags in request/reply. */
1322 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1326 EXPORT_SYMBOL(class_new_import);
1328 void class_destroy_import(struct obd_import *import)
1330 LASSERT(import != NULL);
1331 LASSERT(import != LP_POISON);
1333 spin_lock(&import->imp_lock);
1334 import->imp_generation++;
1335 spin_unlock(&import->imp_lock);
1336 class_import_put(import);
1338 EXPORT_SYMBOL(class_destroy_import);
1340 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1342 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1344 spin_lock(&exp->exp_locks_list_guard);
1346 LASSERT(lock->l_exp_refs_nr >= 0);
1348 if (lock->l_exp_refs_target != NULL &&
1349 lock->l_exp_refs_target != exp) {
1350 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1351 exp, lock, lock->l_exp_refs_target);
1353 if ((lock->l_exp_refs_nr ++) == 0) {
1354 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1355 lock->l_exp_refs_target = exp;
1357 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1358 lock, exp, lock->l_exp_refs_nr);
1359 spin_unlock(&exp->exp_locks_list_guard);
1361 EXPORT_SYMBOL(__class_export_add_lock_ref);
1363 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1365 spin_lock(&exp->exp_locks_list_guard);
1366 LASSERT(lock->l_exp_refs_nr > 0);
1367 if (lock->l_exp_refs_target != exp) {
1368 LCONSOLE_WARN("lock %p, "
1369 "mismatching export pointers: %p, %p\n",
1370 lock, lock->l_exp_refs_target, exp);
1372 if (-- lock->l_exp_refs_nr == 0) {
1373 list_del_init(&lock->l_exp_refs_link);
1374 lock->l_exp_refs_target = NULL;
1376 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1377 lock, exp, lock->l_exp_refs_nr);
1378 spin_unlock(&exp->exp_locks_list_guard);
1380 EXPORT_SYMBOL(__class_export_del_lock_ref);
1383 /* A connection defines an export context in which preallocation can
1384 be managed. This releases the export pointer reference, and returns
1385 the export handle, so the export refcount is 1 when this function
1387 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1388 struct obd_uuid *cluuid)
1390 struct obd_export *export;
1391 LASSERT(conn != NULL);
1392 LASSERT(obd != NULL);
1393 LASSERT(cluuid != NULL);
1396 export = class_new_export(obd, cluuid);
1398 RETURN(PTR_ERR(export));
1400 conn->cookie = export->exp_handle.h_cookie;
1401 class_export_put(export);
1403 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1404 cluuid->uuid, conn->cookie);
1407 EXPORT_SYMBOL(class_connect);
1409 /* if export is involved in recovery then clean up related things */
1410 static void class_export_recovery_cleanup(struct obd_export *exp)
1412 struct obd_device *obd = exp->exp_obd;
1414 spin_lock(&obd->obd_recovery_task_lock);
1415 if (obd->obd_recovering) {
1416 if (exp->exp_in_recovery) {
1417 spin_lock(&exp->exp_lock);
1418 exp->exp_in_recovery = 0;
1419 spin_unlock(&exp->exp_lock);
1420 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1421 atomic_dec(&obd->obd_connected_clients);
1424 /* if called during recovery then should update
1425 * obd_stale_clients counter,
1426 * lightweight exports are not counted */
1427 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1428 exp->exp_obd->obd_stale_clients++;
1430 spin_unlock(&obd->obd_recovery_task_lock);
1432 spin_lock(&exp->exp_lock);
1433 /** Cleanup req replay fields */
1434 if (exp->exp_req_replay_needed) {
1435 exp->exp_req_replay_needed = 0;
1437 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1438 atomic_dec(&obd->obd_req_replay_clients);
1441 /** Cleanup lock replay data */
1442 if (exp->exp_lock_replay_needed) {
1443 exp->exp_lock_replay_needed = 0;
1445 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1446 atomic_dec(&obd->obd_lock_replay_clients);
1448 spin_unlock(&exp->exp_lock);
1451 /* This function removes 1-3 references from the export:
1452 * 1 - for export pointer passed
1453 * and if disconnect really need
1454 * 2 - removing from hash
1455 * 3 - in client_unlink_export
1456 * The export pointer passed to this function can destroyed */
1457 int class_disconnect(struct obd_export *export)
1459 int already_disconnected;
1462 if (export == NULL) {
1463 CWARN("attempting to free NULL export %p\n", export);
1467 spin_lock(&export->exp_lock);
1468 already_disconnected = export->exp_disconnected;
1469 export->exp_disconnected = 1;
1470 /* We hold references of export for uuid hash
1471 * and nid_hash and export link at least. So
1472 * it is safe to call cfs_hash_del in there. */
1473 if (!hlist_unhashed(&export->exp_nid_hash))
1474 cfs_hash_del(export->exp_obd->obd_nid_hash,
1475 &export->exp_connection->c_peer.nid,
1476 &export->exp_nid_hash);
1477 spin_unlock(&export->exp_lock);
1479 /* class_cleanup(), abort_recovery(), and class_fail_export()
1480 * all end up in here, and if any of them race we shouldn't
1481 * call extra class_export_puts(). */
1482 if (already_disconnected) {
1483 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1484 GOTO(no_disconn, already_disconnected);
1487 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1488 export->exp_handle.h_cookie);
1490 class_export_recovery_cleanup(export);
1491 class_unlink_export(export);
1493 class_export_put(export);
1496 EXPORT_SYMBOL(class_disconnect);
1498 /* Return non-zero for a fully connected export */
1499 int class_connected_export(struct obd_export *exp)
1504 spin_lock(&exp->exp_lock);
1505 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1506 spin_unlock(&exp->exp_lock);
1510 EXPORT_SYMBOL(class_connected_export);
1512 static void class_disconnect_export_list(struct list_head *list,
1513 enum obd_option flags)
1516 struct obd_export *exp;
1519 /* It's possible that an export may disconnect itself, but
1520 * nothing else will be added to this list. */
1521 while (!list_empty(list)) {
1522 exp = list_entry(list->next, struct obd_export,
1524 /* need for safe call CDEBUG after obd_disconnect */
1525 class_export_get(exp);
1527 spin_lock(&exp->exp_lock);
1528 exp->exp_flags = flags;
1529 spin_unlock(&exp->exp_lock);
1531 if (obd_uuid_equals(&exp->exp_client_uuid,
1532 &exp->exp_obd->obd_uuid)) {
1534 "exp %p export uuid == obd uuid, don't discon\n",
1536 /* Need to delete this now so we don't end up pointing
1537 * to work_list later when this export is cleaned up. */
1538 list_del_init(&exp->exp_obd_chain);
1539 class_export_put(exp);
1543 class_export_get(exp);
1544 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1545 "last request at %lld\n",
1546 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1547 exp, exp->exp_last_request_time);
1548 /* release one export reference anyway */
1549 rc = obd_disconnect(exp);
1551 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1552 obd_export_nid2str(exp), exp, rc);
1553 class_export_put(exp);
1558 void class_disconnect_exports(struct obd_device *obd)
1560 struct list_head work_list;
1563 /* Move all of the exports from obd_exports to a work list, en masse. */
1564 INIT_LIST_HEAD(&work_list);
1565 spin_lock(&obd->obd_dev_lock);
1566 list_splice_init(&obd->obd_exports, &work_list);
1567 list_splice_init(&obd->obd_delayed_exports, &work_list);
1568 spin_unlock(&obd->obd_dev_lock);
1570 if (!list_empty(&work_list)) {
1571 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1572 "disconnecting them\n", obd->obd_minor, obd);
1573 class_disconnect_export_list(&work_list,
1574 exp_flags_from_obd(obd));
1576 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1577 obd->obd_minor, obd);
1580 EXPORT_SYMBOL(class_disconnect_exports);
1582 /* Remove exports that have not completed recovery.
1584 void class_disconnect_stale_exports(struct obd_device *obd,
1585 int (*test_export)(struct obd_export *))
1587 struct list_head work_list;
1588 struct obd_export *exp, *n;
1592 INIT_LIST_HEAD(&work_list);
1593 spin_lock(&obd->obd_dev_lock);
1594 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1596 /* don't count self-export as client */
1597 if (obd_uuid_equals(&exp->exp_client_uuid,
1598 &exp->exp_obd->obd_uuid))
1601 /* don't evict clients which have no slot in last_rcvd
1602 * (e.g. lightweight connection) */
1603 if (exp->exp_target_data.ted_lr_idx == -1)
1606 spin_lock(&exp->exp_lock);
1607 if (exp->exp_failed || test_export(exp)) {
1608 spin_unlock(&exp->exp_lock);
1611 exp->exp_failed = 1;
1612 spin_unlock(&exp->exp_lock);
1614 list_move(&exp->exp_obd_chain, &work_list);
1616 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1617 obd->obd_name, exp->exp_client_uuid.uuid,
1618 obd_export_nid2str(exp));
1619 print_export_data(exp, "EVICTING", 0, D_HA);
1621 spin_unlock(&obd->obd_dev_lock);
1624 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1625 obd->obd_name, evicted);
1627 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1628 OBD_OPT_ABORT_RECOV);
1631 EXPORT_SYMBOL(class_disconnect_stale_exports);
1633 void class_fail_export(struct obd_export *exp)
1635 int rc, already_failed;
1637 spin_lock(&exp->exp_lock);
1638 already_failed = exp->exp_failed;
1639 exp->exp_failed = 1;
1640 spin_unlock(&exp->exp_lock);
1642 if (already_failed) {
1643 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1644 exp, exp->exp_client_uuid.uuid);
1648 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1649 exp, exp->exp_client_uuid.uuid);
1651 if (obd_dump_on_timeout)
1652 libcfs_debug_dumplog();
1654 /* need for safe call CDEBUG after obd_disconnect */
1655 class_export_get(exp);
1657 /* Most callers into obd_disconnect are removing their own reference
1658 * (request, for example) in addition to the one from the hash table.
1659 * We don't have such a reference here, so make one. */
1660 class_export_get(exp);
1661 rc = obd_disconnect(exp);
1663 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1665 CDEBUG(D_HA, "disconnected export %p/%s\n",
1666 exp, exp->exp_client_uuid.uuid);
1667 class_export_put(exp);
1669 EXPORT_SYMBOL(class_fail_export);
1671 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1673 struct cfs_hash *nid_hash;
1674 struct obd_export *doomed_exp = NULL;
1675 int exports_evicted = 0;
1677 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1679 spin_lock(&obd->obd_dev_lock);
1680 /* umount has run already, so evict thread should leave
1681 * its task to umount thread now */
1682 if (obd->obd_stopping) {
1683 spin_unlock(&obd->obd_dev_lock);
1684 return exports_evicted;
1686 nid_hash = obd->obd_nid_hash;
1687 cfs_hash_getref(nid_hash);
1688 spin_unlock(&obd->obd_dev_lock);
1691 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1692 if (doomed_exp == NULL)
1695 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1696 "nid %s found, wanted nid %s, requested nid %s\n",
1697 obd_export_nid2str(doomed_exp),
1698 libcfs_nid2str(nid_key), nid);
1699 LASSERTF(doomed_exp != obd->obd_self_export,
1700 "self-export is hashed by NID?\n");
1702 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1703 "request\n", obd->obd_name,
1704 obd_uuid2str(&doomed_exp->exp_client_uuid),
1705 obd_export_nid2str(doomed_exp));
1706 class_fail_export(doomed_exp);
1707 class_export_put(doomed_exp);
1710 cfs_hash_putref(nid_hash);
1712 if (!exports_evicted)
1713 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1714 obd->obd_name, nid);
1715 return exports_evicted;
1717 EXPORT_SYMBOL(obd_export_evict_by_nid);
1719 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1721 struct cfs_hash *uuid_hash;
1722 struct obd_export *doomed_exp = NULL;
1723 struct obd_uuid doomed_uuid;
1724 int exports_evicted = 0;
1726 spin_lock(&obd->obd_dev_lock);
1727 if (obd->obd_stopping) {
1728 spin_unlock(&obd->obd_dev_lock);
1729 return exports_evicted;
1731 uuid_hash = obd->obd_uuid_hash;
1732 cfs_hash_getref(uuid_hash);
1733 spin_unlock(&obd->obd_dev_lock);
1735 obd_str2uuid(&doomed_uuid, uuid);
1736 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1737 CERROR("%s: can't evict myself\n", obd->obd_name);
1738 cfs_hash_putref(uuid_hash);
1739 return exports_evicted;
1742 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1744 if (doomed_exp == NULL) {
1745 CERROR("%s: can't disconnect %s: no exports found\n",
1746 obd->obd_name, uuid);
1748 CWARN("%s: evicting %s at adminstrative request\n",
1749 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1750 class_fail_export(doomed_exp);
1751 class_export_put(doomed_exp);
1754 cfs_hash_putref(uuid_hash);
1756 return exports_evicted;
1759 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1760 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1761 EXPORT_SYMBOL(class_export_dump_hook);
1764 static void print_export_data(struct obd_export *exp, const char *status,
1765 int locks, int debug_level)
1767 struct ptlrpc_reply_state *rs;
1768 struct ptlrpc_reply_state *first_reply = NULL;
1771 spin_lock(&exp->exp_lock);
1772 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1778 spin_unlock(&exp->exp_lock);
1780 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1781 "%p %s %llu stale:%d\n",
1782 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1783 obd_export_nid2str(exp),
1784 refcount_read(&exp->exp_handle.h_ref),
1785 atomic_read(&exp->exp_rpc_count),
1786 atomic_read(&exp->exp_cb_count),
1787 atomic_read(&exp->exp_locks_count),
1788 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1789 nreplies, first_reply, nreplies > 3 ? "..." : "",
1790 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1791 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1792 if (locks && class_export_dump_hook != NULL)
1793 class_export_dump_hook(exp);
1797 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1799 struct obd_export *exp;
1801 spin_lock(&obd->obd_dev_lock);
1802 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1803 print_export_data(exp, "ACTIVE", locks, debug_level);
1804 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1805 print_export_data(exp, "UNLINKED", locks, debug_level);
1806 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1807 print_export_data(exp, "DELAYED", locks, debug_level);
1808 spin_unlock(&obd->obd_dev_lock);
1811 void obd_exports_barrier(struct obd_device *obd)
1814 LASSERT(list_empty(&obd->obd_exports));
1815 spin_lock(&obd->obd_dev_lock);
1816 while (!list_empty(&obd->obd_unlinked_exports)) {
1817 spin_unlock(&obd->obd_dev_lock);
1818 set_current_state(TASK_UNINTERRUPTIBLE);
1819 schedule_timeout(cfs_time_seconds(waited));
1820 if (waited > 5 && is_power_of_2(waited)) {
1821 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1822 "more than %d seconds. "
1823 "The obd refcount = %d. Is it stuck?\n",
1824 obd->obd_name, waited,
1825 atomic_read(&obd->obd_refcount));
1826 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1829 spin_lock(&obd->obd_dev_lock);
1831 spin_unlock(&obd->obd_dev_lock);
1833 EXPORT_SYMBOL(obd_exports_barrier);
1836 * Add export to the obd_zombe thread and notify it.
1838 static void obd_zombie_export_add(struct obd_export *exp) {
1839 atomic_dec(&obd_stale_export_num);
1840 spin_lock(&exp->exp_obd->obd_dev_lock);
1841 LASSERT(!list_empty(&exp->exp_obd_chain));
1842 list_del_init(&exp->exp_obd_chain);
1843 spin_unlock(&exp->exp_obd->obd_dev_lock);
1845 queue_work(zombie_wq, &exp->exp_zombie_work);
1849 * Add import to the obd_zombe thread and notify it.
1851 static void obd_zombie_import_add(struct obd_import *imp) {
1852 LASSERT(imp->imp_sec == NULL);
1854 queue_work(zombie_wq, &imp->imp_zombie_work);
1858 * wait when obd_zombie import/export queues become empty
1860 void obd_zombie_barrier(void)
1862 flush_workqueue(zombie_wq);
1864 EXPORT_SYMBOL(obd_zombie_barrier);
1867 struct obd_export *obd_stale_export_get(void)
1869 struct obd_export *exp = NULL;
1872 spin_lock(&obd_stale_export_lock);
1873 if (!list_empty(&obd_stale_exports)) {
1874 exp = list_entry(obd_stale_exports.next,
1875 struct obd_export, exp_stale_list);
1876 list_del_init(&exp->exp_stale_list);
1878 spin_unlock(&obd_stale_export_lock);
1881 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1882 atomic_read(&obd_stale_export_num));
1886 EXPORT_SYMBOL(obd_stale_export_get);
1888 void obd_stale_export_put(struct obd_export *exp)
1892 LASSERT(list_empty(&exp->exp_stale_list));
1893 if (exp->exp_lock_hash &&
1894 atomic_read(&exp->exp_lock_hash->hs_count)) {
1895 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1896 atomic_read(&obd_stale_export_num));
1898 spin_lock_bh(&exp->exp_bl_list_lock);
1899 spin_lock(&obd_stale_export_lock);
1900 /* Add to the tail if there is no blocked locks,
1901 * to the head otherwise. */
1902 if (list_empty(&exp->exp_bl_list))
1903 list_add_tail(&exp->exp_stale_list,
1904 &obd_stale_exports);
1906 list_add(&exp->exp_stale_list,
1907 &obd_stale_exports);
1909 spin_unlock(&obd_stale_export_lock);
1910 spin_unlock_bh(&exp->exp_bl_list_lock);
1912 class_export_put(exp);
1916 EXPORT_SYMBOL(obd_stale_export_put);
1919 * Adjust the position of the export in the stale list,
1920 * i.e. move to the head of the list if is needed.
1922 void obd_stale_export_adjust(struct obd_export *exp)
1924 LASSERT(exp != NULL);
1925 spin_lock_bh(&exp->exp_bl_list_lock);
1926 spin_lock(&obd_stale_export_lock);
1928 if (!list_empty(&exp->exp_stale_list) &&
1929 !list_empty(&exp->exp_bl_list))
1930 list_move(&exp->exp_stale_list, &obd_stale_exports);
1932 spin_unlock(&obd_stale_export_lock);
1933 spin_unlock_bh(&exp->exp_bl_list_lock);
1935 EXPORT_SYMBOL(obd_stale_export_adjust);
1938 * start destroy zombie import/export thread
1940 int obd_zombie_impexp_init(void)
1942 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1950 * stop destroy zombie import/export thread
1952 void obd_zombie_impexp_stop(void)
1954 destroy_workqueue(zombie_wq);
1955 LASSERT(list_empty(&obd_stale_exports));
1958 /***** Kernel-userspace comm helpers *******/
1960 /* Get length of entire message, including header */
1961 int kuc_len(int payload_len)
1963 return sizeof(struct kuc_hdr) + payload_len;
1965 EXPORT_SYMBOL(kuc_len);
1967 /* Get a pointer to kuc header, given a ptr to the payload
1968 * @param p Pointer to payload area
1969 * @returns Pointer to kuc header
1971 struct kuc_hdr * kuc_ptr(void *p)
1973 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1974 LASSERT(lh->kuc_magic == KUC_MAGIC);
1977 EXPORT_SYMBOL(kuc_ptr);
1979 /* Alloc space for a message, and fill in header
1980 * @return Pointer to payload area
1982 void *kuc_alloc(int payload_len, int transport, int type)
1985 int len = kuc_len(payload_len);
1989 return ERR_PTR(-ENOMEM);
1991 lh->kuc_magic = KUC_MAGIC;
1992 lh->kuc_transport = transport;
1993 lh->kuc_msgtype = type;
1994 lh->kuc_msglen = len;
1996 return (void *)(lh + 1);
1998 EXPORT_SYMBOL(kuc_alloc);
2000 /* Takes pointer to payload area */
2001 void kuc_free(void *p, int payload_len)
2003 struct kuc_hdr *lh = kuc_ptr(p);
2004 OBD_FREE(lh, kuc_len(payload_len));
2006 EXPORT_SYMBOL(kuc_free);
2008 struct obd_request_slot_waiter {
2009 struct list_head orsw_entry;
2010 wait_queue_head_t orsw_waitq;
2014 static bool obd_request_slot_avail(struct client_obd *cli,
2015 struct obd_request_slot_waiter *orsw)
2019 spin_lock(&cli->cl_loi_list_lock);
2020 avail = !!list_empty(&orsw->orsw_entry);
2021 spin_unlock(&cli->cl_loi_list_lock);
2027 * For network flow control, the RPC sponsor needs to acquire a credit
2028 * before sending the RPC. The credits count for a connection is defined
2029 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2030 * the subsequent RPC sponsors need to wait until others released their
2031 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2033 int obd_get_request_slot(struct client_obd *cli)
2035 struct obd_request_slot_waiter orsw;
2036 struct l_wait_info lwi;
2039 spin_lock(&cli->cl_loi_list_lock);
2040 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2041 cli->cl_rpcs_in_flight++;
2042 spin_unlock(&cli->cl_loi_list_lock);
2046 init_waitqueue_head(&orsw.orsw_waitq);
2047 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2048 orsw.orsw_signaled = false;
2049 spin_unlock(&cli->cl_loi_list_lock);
2051 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2052 rc = l_wait_event(orsw.orsw_waitq,
2053 obd_request_slot_avail(cli, &orsw) ||
2057 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2058 * freed but other (such as obd_put_request_slot) is using it. */
2059 spin_lock(&cli->cl_loi_list_lock);
2061 if (!orsw.orsw_signaled) {
2062 if (list_empty(&orsw.orsw_entry))
2063 cli->cl_rpcs_in_flight--;
2065 list_del(&orsw.orsw_entry);
2069 if (orsw.orsw_signaled) {
2070 LASSERT(list_empty(&orsw.orsw_entry));
2074 spin_unlock(&cli->cl_loi_list_lock);
2078 EXPORT_SYMBOL(obd_get_request_slot);
2080 void obd_put_request_slot(struct client_obd *cli)
2082 struct obd_request_slot_waiter *orsw;
2084 spin_lock(&cli->cl_loi_list_lock);
2085 cli->cl_rpcs_in_flight--;
2087 /* If there is free slot, wakeup the first waiter. */
2088 if (!list_empty(&cli->cl_flight_waiters) &&
2089 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2090 orsw = list_entry(cli->cl_flight_waiters.next,
2091 struct obd_request_slot_waiter, orsw_entry);
2092 list_del_init(&orsw->orsw_entry);
2093 cli->cl_rpcs_in_flight++;
2094 wake_up(&orsw->orsw_waitq);
2096 spin_unlock(&cli->cl_loi_list_lock);
2098 EXPORT_SYMBOL(obd_put_request_slot);
2100 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2102 return cli->cl_max_rpcs_in_flight;
2104 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2106 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2108 struct obd_request_slot_waiter *orsw;
2112 const char *type_name;
2115 if (max > OBD_MAX_RIF_MAX || max < 1)
2118 type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2119 if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2120 /* adjust max_mod_rpcs_in_flight to ensure it is always
2121 * strictly lower that max_rpcs_in_flight */
2123 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2124 "because it must be higher than "
2125 "max_mod_rpcs_in_flight value",
2126 cli->cl_import->imp_obd->obd_name);
2129 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2130 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2136 spin_lock(&cli->cl_loi_list_lock);
2137 old = cli->cl_max_rpcs_in_flight;
2138 cli->cl_max_rpcs_in_flight = max;
2139 client_adjust_max_dirty(cli);
2143 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2144 for (i = 0; i < diff; i++) {
2145 if (list_empty(&cli->cl_flight_waiters))
2148 orsw = list_entry(cli->cl_flight_waiters.next,
2149 struct obd_request_slot_waiter, orsw_entry);
2150 list_del_init(&orsw->orsw_entry);
2151 cli->cl_rpcs_in_flight++;
2152 wake_up(&orsw->orsw_waitq);
2154 spin_unlock(&cli->cl_loi_list_lock);
2158 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2160 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2162 return cli->cl_max_mod_rpcs_in_flight;
2164 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2166 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2168 struct obd_connect_data *ocd;
2172 if (max > OBD_MAX_RIF_MAX || max < 1)
2175 /* cannot exceed or equal max_rpcs_in_flight */
2176 if (max >= cli->cl_max_rpcs_in_flight) {
2177 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2178 "higher or equal to max_rpcs_in_flight value (%u)\n",
2179 cli->cl_import->imp_obd->obd_name,
2180 max, cli->cl_max_rpcs_in_flight);
2184 /* cannot exceed max modify RPCs in flight supported by the server */
2185 ocd = &cli->cl_import->imp_connect_data;
2186 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2187 maxmodrpcs = ocd->ocd_maxmodrpcs;
2190 if (max > maxmodrpcs) {
2191 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2192 "higher than max_mod_rpcs_per_client value (%hu) "
2193 "returned by the server at connection\n",
2194 cli->cl_import->imp_obd->obd_name,
2199 spin_lock(&cli->cl_mod_rpcs_lock);
2201 prev = cli->cl_max_mod_rpcs_in_flight;
2202 cli->cl_max_mod_rpcs_in_flight = max;
2204 /* wakeup waiters if limit has been increased */
2205 if (cli->cl_max_mod_rpcs_in_flight > prev)
2206 wake_up(&cli->cl_mod_rpcs_waitq);
2208 spin_unlock(&cli->cl_mod_rpcs_lock);
2212 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2214 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2215 struct seq_file *seq)
2217 unsigned long mod_tot = 0, mod_cum;
2218 struct timespec64 now;
2221 ktime_get_real_ts64(&now);
2223 spin_lock(&cli->cl_mod_rpcs_lock);
2225 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2226 (s64)now.tv_sec, now.tv_nsec);
2227 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2228 cli->cl_mod_rpcs_in_flight);
2230 seq_printf(seq, "\n\t\t\tmodify\n");
2231 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2233 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2236 for (i = 0; i < OBD_HIST_MAX; i++) {
2237 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2239 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2240 i, mod, pct(mod, mod_tot),
2241 pct(mod_cum, mod_tot));
2242 if (mod_cum == mod_tot)
2246 spin_unlock(&cli->cl_mod_rpcs_lock);
2250 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2252 /* The number of modify RPCs sent in parallel is limited
2253 * because the server has a finite number of slots per client to
2254 * store request result and ensure reply reconstruction when needed.
2255 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2256 * that takes into account server limit and cl_max_rpcs_in_flight
2258 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2259 * one close request is allowed above the maximum.
2261 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2266 /* A slot is available if
2267 * - number of modify RPCs in flight is less than the max
2268 * - it's a close RPC and no other close request is in flight
2270 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2271 (close_req && cli->cl_close_rpcs_in_flight == 0);
2276 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2281 spin_lock(&cli->cl_mod_rpcs_lock);
2282 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2283 spin_unlock(&cli->cl_mod_rpcs_lock);
2288 /* Get a modify RPC slot from the obd client @cli according
2289 * to the kind of operation @opc that is going to be sent
2290 * and the intent @it of the operation if it applies.
2291 * If the maximum number of modify RPCs in flight is reached
2292 * the thread is put to sleep.
2293 * Returns the tag to be set in the request message. Tag 0
2294 * is reserved for non-modifying requests.
2296 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2298 bool close_req = false;
2301 if (opc == MDS_CLOSE)
2305 spin_lock(&cli->cl_mod_rpcs_lock);
2306 max = cli->cl_max_mod_rpcs_in_flight;
2307 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2308 /* there is a slot available */
2309 cli->cl_mod_rpcs_in_flight++;
2311 cli->cl_close_rpcs_in_flight++;
2312 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2313 cli->cl_mod_rpcs_in_flight);
2314 /* find a free tag */
2315 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2317 LASSERT(i < OBD_MAX_RIF_MAX);
2318 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2319 spin_unlock(&cli->cl_mod_rpcs_lock);
2320 /* tag 0 is reserved for non-modify RPCs */
2322 CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
2323 "opc %u, max %hu\n",
2324 cli->cl_import->imp_obd->obd_name,
2329 spin_unlock(&cli->cl_mod_rpcs_lock);
2331 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2332 "opc %u, max %hu\n",
2333 cli->cl_import->imp_obd->obd_name, opc, max);
2335 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2336 obd_mod_rpc_slot_avail(cli,
2340 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2342 /* Put a modify RPC slot from the obd client @cli according
2343 * to the kind of operation @opc that has been sent.
2345 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2347 bool close_req = false;
2352 if (opc == MDS_CLOSE)
2355 spin_lock(&cli->cl_mod_rpcs_lock);
2356 cli->cl_mod_rpcs_in_flight--;
2358 cli->cl_close_rpcs_in_flight--;
2359 /* release the tag in the bitmap */
2360 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2361 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2362 spin_unlock(&cli->cl_mod_rpcs_lock);
2363 wake_up(&cli->cl_mod_rpcs_waitq);
2365 EXPORT_SYMBOL(obd_put_mod_rpc_slot);