4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59 const char *status, int locks, int debug_level);
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
66 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
83 static void obd_device_free(struct obd_device *obd)
86 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
87 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
88 if (obd->obd_namespace != NULL) {
89 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
90 obd, obd->obd_namespace, obd->obd_force);
93 lu_ref_fini(&obd->obd_reference);
94 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 struct obd_type *class_search_type(const char *name)
99 struct kobject *kobj = kset_find_obj(lustre_kset, name);
101 if (kobj && kobj->ktype == &class_ktype)
102 return container_of(kobj, struct obd_type, typ_kobj);
107 EXPORT_SYMBOL(class_search_type);
109 struct obd_type *class_get_type(const char *name)
111 struct obd_type *type;
113 type = class_search_type(name);
114 #ifdef HAVE_MODULE_LOADING_SUPPORT
116 const char *modname = name;
118 #ifdef HAVE_SERVER_SUPPORT
119 if (strcmp(modname, "obdfilter") == 0)
122 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
123 modname = LUSTRE_OSP_NAME;
125 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
126 modname = LUSTRE_MDT_NAME;
127 #endif /* HAVE_SERVER_SUPPORT */
129 if (!request_module("%s", modname)) {
130 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
131 type = class_search_type(name);
133 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
139 if (try_module_get(type->typ_dt_ops->o_owner)) {
140 atomic_inc(&type->typ_refcnt);
141 /* class_search_type() returned a counted reference,
142 * but we don't need that count any more as
143 * we have one through typ_refcnt.
145 kobject_put(&type->typ_kobj);
147 kobject_put(&type->typ_kobj);
154 void class_put_type(struct obd_type *type)
157 module_put(type->typ_dt_ops->o_owner);
158 atomic_dec(&type->typ_refcnt);
161 static void class_sysfs_release(struct kobject *kobj)
163 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
165 debugfs_remove_recursive(type->typ_debugfs_entry);
166 type->typ_debugfs_entry = NULL;
169 lu_device_type_fini(type->typ_lu);
171 #ifdef CONFIG_PROC_FS
172 if (type->typ_name && type->typ_procroot)
173 remove_proc_subtree(type->typ_name, proc_lustre_root);
175 if (type->typ_md_ops)
176 OBD_FREE_PTR(type->typ_md_ops);
177 if (type->typ_dt_ops)
178 OBD_FREE_PTR(type->typ_dt_ops);
180 OBD_FREE(type, sizeof(*type));
183 static struct kobj_type class_ktype = {
184 .sysfs_ops = &lustre_sysfs_ops,
185 .release = class_sysfs_release,
188 #ifdef HAVE_SERVER_SUPPORT
189 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
191 struct dentry *symlink;
192 struct obd_type *type;
195 type = class_search_type(name);
197 kobject_put(&type->typ_kobj);
198 return ERR_PTR(-EEXIST);
201 OBD_ALLOC(type, sizeof(*type));
203 return ERR_PTR(-ENOMEM);
205 type->typ_kobj.kset = lustre_kset;
206 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
207 &lustre_kset->kobj, "%s", name);
211 symlink = debugfs_create_dir(name, debugfs_lustre_root);
212 if (IS_ERR_OR_NULL(symlink)) {
213 rc = symlink ? PTR_ERR(symlink) : -ENOMEM;
214 kobject_put(&type->typ_kobj);
217 type->typ_debugfs_entry = symlink;
218 type->typ_sym_filter = true;
221 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
223 if (IS_ERR(type->typ_procroot)) {
224 CERROR("%s: can't create compat proc entry: %d\n",
225 name, (int)PTR_ERR(type->typ_procroot));
226 type->typ_procroot = NULL;
232 EXPORT_SYMBOL(class_add_symlinks);
233 #endif /* HAVE_SERVER_SUPPORT */
235 #define CLASS_MAX_NAME 1024
237 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
238 bool enable_proc, struct lprocfs_vars *vars,
239 const char *name, struct lu_device_type *ldt)
241 struct obd_type *type;
246 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
248 type = class_search_type(name);
250 #ifdef HAVE_SERVER_SUPPORT
251 if (type->typ_sym_filter)
253 #endif /* HAVE_SERVER_SUPPORT */
254 kobject_put(&type->typ_kobj);
255 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
259 OBD_ALLOC(type, sizeof(*type));
263 type->typ_kobj.kset = lustre_kset;
264 kobject_init(&type->typ_kobj, &class_ktype);
265 #ifdef HAVE_SERVER_SUPPORT
267 #endif /* HAVE_SERVER_SUPPORT */
268 OBD_ALLOC_PTR(type->typ_dt_ops);
269 OBD_ALLOC_PTR(type->typ_md_ops);
271 if (type->typ_dt_ops == NULL ||
272 type->typ_md_ops == NULL)
273 GOTO (failed, rc = -ENOMEM);
275 *(type->typ_dt_ops) = *dt_ops;
276 /* md_ops is optional */
278 *(type->typ_md_ops) = *md_ops;
280 #ifdef HAVE_SERVER_SUPPORT
281 if (type->typ_sym_filter) {
282 type->typ_sym_filter = false;
283 kobject_put(&type->typ_kobj);
287 #ifdef CONFIG_PROC_FS
288 if (enable_proc && !type->typ_procroot) {
289 type->typ_procroot = lprocfs_register(name,
292 if (IS_ERR(type->typ_procroot)) {
293 rc = PTR_ERR(type->typ_procroot);
294 type->typ_procroot = NULL;
299 type->typ_debugfs_entry = ldebugfs_register(name, debugfs_lustre_root,
301 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
302 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
304 type->typ_debugfs_entry = NULL;
308 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
311 #ifdef HAVE_SERVER_SUPPORT
316 rc = lu_device_type_init(ldt);
324 kobject_put(&type->typ_kobj);
328 EXPORT_SYMBOL(class_register_type);
330 int class_unregister_type(const char *name)
332 struct obd_type *type = class_search_type(name);
337 CERROR("unknown obd type\n");
341 if (atomic_read(&type->typ_refcnt)) {
342 CERROR("type %s has refcount (%d)\n", name,
343 atomic_read(&type->typ_refcnt));
344 /* This is a bad situation, let's make the best of it */
345 /* Remove ops, but leave the name for debugging */
346 OBD_FREE_PTR(type->typ_dt_ops);
347 OBD_FREE_PTR(type->typ_md_ops);
348 GOTO(out_put, rc = -EBUSY);
351 /* Put the final ref */
352 kobject_put(&type->typ_kobj);
354 /* Put the ref returned by class_search_type() */
355 kobject_put(&type->typ_kobj);
358 } /* class_unregister_type */
359 EXPORT_SYMBOL(class_unregister_type);
362 * Create a new obd device.
364 * Allocate the new obd_device and initialize it.
366 * \param[in] type_name obd device type string.
367 * \param[in] name obd device name.
368 * \param[in] uuid obd device UUID
370 * \retval newdev pointer to created obd_device
371 * \retval ERR_PTR(errno) on error
373 struct obd_device *class_newdev(const char *type_name, const char *name,
376 struct obd_device *newdev;
377 struct obd_type *type = NULL;
380 if (strlen(name) >= MAX_OBD_NAME) {
381 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
382 RETURN(ERR_PTR(-EINVAL));
385 type = class_get_type(type_name);
387 CERROR("OBD: unknown type: %s\n", type_name);
388 RETURN(ERR_PTR(-ENODEV));
391 newdev = obd_device_alloc();
392 if (newdev == NULL) {
393 class_put_type(type);
394 RETURN(ERR_PTR(-ENOMEM));
396 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
397 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
398 newdev->obd_type = type;
399 newdev->obd_minor = -1;
401 rwlock_init(&newdev->obd_pool_lock);
402 newdev->obd_pool_limit = 0;
403 newdev->obd_pool_slv = 0;
405 INIT_LIST_HEAD(&newdev->obd_exports);
406 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
407 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
408 INIT_LIST_HEAD(&newdev->obd_exports_timed);
409 INIT_LIST_HEAD(&newdev->obd_nid_stats);
410 spin_lock_init(&newdev->obd_nid_lock);
411 spin_lock_init(&newdev->obd_dev_lock);
412 mutex_init(&newdev->obd_dev_mutex);
413 spin_lock_init(&newdev->obd_osfs_lock);
414 /* newdev->obd_osfs_age must be set to a value in the distant
415 * past to guarantee a fresh statfs is fetched on mount. */
416 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
418 /* XXX belongs in setup not attach */
419 init_rwsem(&newdev->obd_observer_link_sem);
421 spin_lock_init(&newdev->obd_recovery_task_lock);
422 init_waitqueue_head(&newdev->obd_next_transno_waitq);
423 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
424 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
425 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
426 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
427 INIT_LIST_HEAD(&newdev->obd_evict_list);
428 INIT_LIST_HEAD(&newdev->obd_lwp_list);
430 llog_group_init(&newdev->obd_olg);
431 /* Detach drops this */
432 atomic_set(&newdev->obd_refcount, 1);
433 lu_ref_init(&newdev->obd_reference);
434 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
436 newdev->obd_conn_inprogress = 0;
438 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
440 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
441 newdev->obd_name, newdev);
449 * \param[in] obd obd_device to be freed
453 void class_free_dev(struct obd_device *obd)
455 struct obd_type *obd_type = obd->obd_type;
457 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
458 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
459 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
460 "obd %p != obd_devs[%d] %p\n",
461 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
462 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
463 "obd_refcount should be 0, not %d\n",
464 atomic_read(&obd->obd_refcount));
465 LASSERT(obd_type != NULL);
467 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
468 obd->obd_name, obd->obd_type->typ_name);
470 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
471 obd->obd_name, obd->obd_uuid.uuid);
472 if (obd->obd_stopping) {
475 /* If we're not stopping, we were never set up */
476 err = obd_cleanup(obd);
478 CERROR("Cleanup %s returned %d\n",
482 obd_device_free(obd);
484 class_put_type(obd_type);
488 * Unregister obd device.
490 * Free slot in obd_dev[] used by \a obd.
492 * \param[in] new_obd obd_device to be unregistered
496 void class_unregister_device(struct obd_device *obd)
498 write_lock(&obd_dev_lock);
499 if (obd->obd_minor >= 0) {
500 LASSERT(obd_devs[obd->obd_minor] == obd);
501 obd_devs[obd->obd_minor] = NULL;
504 write_unlock(&obd_dev_lock);
508 * Register obd device.
510 * Find free slot in obd_devs[], fills it with \a new_obd.
512 * \param[in] new_obd obd_device to be registered
515 * \retval -EEXIST device with this name is registered
516 * \retval -EOVERFLOW obd_devs[] is full
518 int class_register_device(struct obd_device *new_obd)
522 int new_obd_minor = 0;
523 bool minor_assign = false;
524 bool retried = false;
527 write_lock(&obd_dev_lock);
528 for (i = 0; i < class_devno_max(); i++) {
529 struct obd_device *obd = class_num2obd(i);
532 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
535 write_unlock(&obd_dev_lock);
537 /* the obd_device could be waited to be
538 * destroyed by the "obd_zombie_impexp_thread".
540 obd_zombie_barrier();
545 CERROR("%s: already exists, won't add\n",
547 /* in case we found a free slot before duplicate */
548 minor_assign = false;
552 if (!minor_assign && obd == NULL) {
559 new_obd->obd_minor = new_obd_minor;
560 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
561 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
562 obd_devs[new_obd_minor] = new_obd;
566 CERROR("%s: all %u/%u devices used, increase "
567 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
568 i, class_devno_max(), ret);
571 write_unlock(&obd_dev_lock);
576 static int class_name2dev_nolock(const char *name)
583 for (i = 0; i < class_devno_max(); i++) {
584 struct obd_device *obd = class_num2obd(i);
586 if (obd && strcmp(name, obd->obd_name) == 0) {
587 /* Make sure we finished attaching before we give
588 out any references */
589 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
590 if (obd->obd_attached) {
600 int class_name2dev(const char *name)
607 read_lock(&obd_dev_lock);
608 i = class_name2dev_nolock(name);
609 read_unlock(&obd_dev_lock);
613 EXPORT_SYMBOL(class_name2dev);
615 struct obd_device *class_name2obd(const char *name)
617 int dev = class_name2dev(name);
619 if (dev < 0 || dev > class_devno_max())
621 return class_num2obd(dev);
623 EXPORT_SYMBOL(class_name2obd);
625 int class_uuid2dev_nolock(struct obd_uuid *uuid)
629 for (i = 0; i < class_devno_max(); i++) {
630 struct obd_device *obd = class_num2obd(i);
632 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
633 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
641 int class_uuid2dev(struct obd_uuid *uuid)
645 read_lock(&obd_dev_lock);
646 i = class_uuid2dev_nolock(uuid);
647 read_unlock(&obd_dev_lock);
651 EXPORT_SYMBOL(class_uuid2dev);
653 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
655 int dev = class_uuid2dev(uuid);
658 return class_num2obd(dev);
660 EXPORT_SYMBOL(class_uuid2obd);
663 * Get obd device from ::obd_devs[]
665 * \param num [in] array index
667 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
668 * otherwise return the obd device there.
670 struct obd_device *class_num2obd(int num)
672 struct obd_device *obd = NULL;
674 if (num < class_devno_max()) {
679 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
680 "%p obd_magic %08x != %08x\n",
681 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
682 LASSERTF(obd->obd_minor == num,
683 "%p obd_minor %0d != %0d\n",
684 obd, obd->obd_minor, num);
691 * Find obd in obd_dev[] by name or uuid.
693 * Increment obd's refcount if found.
695 * \param[in] str obd name or uuid
697 * \retval NULL if not found
698 * \retval target pointer to found obd_device
700 struct obd_device *class_dev_by_str(const char *str)
702 struct obd_device *target = NULL;
703 struct obd_uuid tgtuuid;
706 obd_str2uuid(&tgtuuid, str);
708 read_lock(&obd_dev_lock);
709 rc = class_uuid2dev_nolock(&tgtuuid);
711 rc = class_name2dev_nolock(str);
714 target = class_num2obd(rc);
717 class_incref(target, "find", current);
718 read_unlock(&obd_dev_lock);
722 EXPORT_SYMBOL(class_dev_by_str);
725 * Get obd devices count. Device in any
727 * \retval obd device count
729 int get_devices_count(void)
731 int index, max_index = class_devno_max(), dev_count = 0;
733 read_lock(&obd_dev_lock);
734 for (index = 0; index <= max_index; index++) {
735 struct obd_device *obd = class_num2obd(index);
739 read_unlock(&obd_dev_lock);
743 EXPORT_SYMBOL(get_devices_count);
745 void class_obd_list(void)
750 read_lock(&obd_dev_lock);
751 for (i = 0; i < class_devno_max(); i++) {
752 struct obd_device *obd = class_num2obd(i);
756 if (obd->obd_stopping)
758 else if (obd->obd_set_up)
760 else if (obd->obd_attached)
764 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
765 i, status, obd->obd_type->typ_name,
766 obd->obd_name, obd->obd_uuid.uuid,
767 atomic_read(&obd->obd_refcount));
769 read_unlock(&obd_dev_lock);
772 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
773 specified, then only the client with that uuid is returned,
774 otherwise any client connected to the tgt is returned. */
775 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
776 const char *type_name,
777 struct obd_uuid *grp_uuid)
781 read_lock(&obd_dev_lock);
782 for (i = 0; i < class_devno_max(); i++) {
783 struct obd_device *obd = class_num2obd(i);
787 if ((strncmp(obd->obd_type->typ_name, type_name,
788 strlen(type_name)) == 0)) {
789 if (obd_uuid_equals(tgt_uuid,
790 &obd->u.cli.cl_target_uuid) &&
791 ((grp_uuid)? obd_uuid_equals(grp_uuid,
792 &obd->obd_uuid) : 1)) {
793 read_unlock(&obd_dev_lock);
798 read_unlock(&obd_dev_lock);
802 EXPORT_SYMBOL(class_find_client_obd);
804 /* Iterate the obd_device list looking devices have grp_uuid. Start
805 searching at *next, and if a device is found, the next index to look
806 at is saved in *next. If next is NULL, then the first matching device
807 will always be returned. */
808 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
814 else if (*next >= 0 && *next < class_devno_max())
819 read_lock(&obd_dev_lock);
820 for (; i < class_devno_max(); i++) {
821 struct obd_device *obd = class_num2obd(i);
825 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
828 read_unlock(&obd_dev_lock);
832 read_unlock(&obd_dev_lock);
836 EXPORT_SYMBOL(class_devices_in_group);
839 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
840 * adjust sptlrpc settings accordingly.
842 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
844 struct obd_device *obd;
848 LASSERT(namelen > 0);
850 read_lock(&obd_dev_lock);
851 for (i = 0; i < class_devno_max(); i++) {
852 obd = class_num2obd(i);
854 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
857 /* only notify mdc, osc, osp, lwp, mdt, ost
858 * because only these have a -sptlrpc llog */
859 type = obd->obd_type->typ_name;
860 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
861 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
862 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
863 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
864 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
865 strcmp(type, LUSTRE_OST_NAME) != 0)
868 if (strncmp(obd->obd_name, fsname, namelen))
871 class_incref(obd, __FUNCTION__, obd);
872 read_unlock(&obd_dev_lock);
873 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
874 sizeof(KEY_SPTLRPC_CONF),
875 KEY_SPTLRPC_CONF, 0, NULL, NULL);
877 class_decref(obd, __FUNCTION__, obd);
878 read_lock(&obd_dev_lock);
880 read_unlock(&obd_dev_lock);
883 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
885 void obd_cleanup_caches(void)
888 if (obd_device_cachep) {
889 kmem_cache_destroy(obd_device_cachep);
890 obd_device_cachep = NULL;
896 int obd_init_caches(void)
901 LASSERT(obd_device_cachep == NULL);
902 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
903 sizeof(struct obd_device),
904 0, 0, 0, sizeof(struct obd_device), NULL);
905 if (!obd_device_cachep)
906 GOTO(out, rc = -ENOMEM);
910 obd_cleanup_caches();
914 static struct portals_handle_ops export_handle_ops;
916 /* map connection to client */
917 struct obd_export *class_conn2export(struct lustre_handle *conn)
919 struct obd_export *export;
923 CDEBUG(D_CACHE, "looking for null handle\n");
927 if (conn->cookie == -1) { /* this means assign a new connection */
928 CDEBUG(D_CACHE, "want a new connection\n");
932 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
933 export = class_handle2object(conn->cookie, &export_handle_ops);
936 EXPORT_SYMBOL(class_conn2export);
938 struct obd_device *class_exp2obd(struct obd_export *exp)
944 EXPORT_SYMBOL(class_exp2obd);
946 struct obd_import *class_exp2cliimp(struct obd_export *exp)
948 struct obd_device *obd = exp->exp_obd;
951 return obd->u.cli.cl_import;
953 EXPORT_SYMBOL(class_exp2cliimp);
955 /* Export management functions */
956 static void class_export_destroy(struct obd_export *exp)
958 struct obd_device *obd = exp->exp_obd;
961 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
962 LASSERT(obd != NULL);
964 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
965 exp->exp_client_uuid.uuid, obd->obd_name);
967 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
968 if (exp->exp_connection)
969 ptlrpc_put_connection_superhack(exp->exp_connection);
971 LASSERT(list_empty(&exp->exp_outstanding_replies));
972 LASSERT(list_empty(&exp->exp_uncommitted_replies));
973 LASSERT(list_empty(&exp->exp_req_replay_queue));
974 LASSERT(list_empty(&exp->exp_hp_rpcs));
975 obd_destroy_export(exp);
976 /* self export doesn't hold a reference to an obd, although it
977 * exists until freeing of the obd */
978 if (exp != obd->obd_self_export)
979 class_decref(obd, "export", exp);
981 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
985 static struct portals_handle_ops export_handle_ops = {
987 .hop_type = "export",
990 struct obd_export *class_export_get(struct obd_export *exp)
992 refcount_inc(&exp->exp_handle.h_ref);
993 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
994 refcount_read(&exp->exp_handle.h_ref));
997 EXPORT_SYMBOL(class_export_get);
999 void class_export_put(struct obd_export *exp)
1001 LASSERT(exp != NULL);
1002 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
1003 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
1004 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1005 refcount_read(&exp->exp_handle.h_ref) - 1);
1007 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
1008 struct obd_device *obd = exp->exp_obd;
1010 CDEBUG(D_IOCTL, "final put %p/%s\n",
1011 exp, exp->exp_client_uuid.uuid);
1013 /* release nid stat refererence */
1014 lprocfs_exp_cleanup(exp);
1016 if (exp == obd->obd_self_export) {
1017 /* self export should be destroyed without
1018 * zombie thread as it doesn't hold a
1019 * reference to obd and doesn't hold any
1021 class_export_destroy(exp);
1022 /* self export is destroyed, no class
1023 * references exist and it is safe to free
1025 class_free_dev(obd);
1027 LASSERT(!list_empty(&exp->exp_obd_chain));
1028 obd_zombie_export_add(exp);
1033 EXPORT_SYMBOL(class_export_put);
1035 static void obd_zombie_exp_cull(struct work_struct *ws)
1037 struct obd_export *export;
1039 export = container_of(ws, struct obd_export, exp_zombie_work);
1040 class_export_destroy(export);
1043 /* Creates a new export, adds it to the hash table, and returns a
1044 * pointer to it. The refcount is 2: one for the hash reference, and
1045 * one for the pointer returned by this function. */
1046 struct obd_export *__class_new_export(struct obd_device *obd,
1047 struct obd_uuid *cluuid, bool is_self)
1049 struct obd_export *export;
1053 OBD_ALLOC_PTR(export);
1055 return ERR_PTR(-ENOMEM);
1057 export->exp_conn_cnt = 0;
1058 export->exp_lock_hash = NULL;
1059 export->exp_flock_hash = NULL;
1060 /* 2 = class_handle_hash + last */
1061 refcount_set(&export->exp_handle.h_ref, 2);
1062 atomic_set(&export->exp_rpc_count, 0);
1063 atomic_set(&export->exp_cb_count, 0);
1064 atomic_set(&export->exp_locks_count, 0);
1065 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1066 INIT_LIST_HEAD(&export->exp_locks_list);
1067 spin_lock_init(&export->exp_locks_list_guard);
1069 atomic_set(&export->exp_replay_count, 0);
1070 export->exp_obd = obd;
1071 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1072 spin_lock_init(&export->exp_uncommitted_replies_lock);
1073 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1074 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1075 INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1076 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1077 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1078 class_handle_hash(&export->exp_handle, &export_handle_ops);
1079 export->exp_last_request_time = ktime_get_real_seconds();
1080 spin_lock_init(&export->exp_lock);
1081 spin_lock_init(&export->exp_rpc_lock);
1082 INIT_HLIST_NODE(&export->exp_nid_hash);
1083 INIT_HLIST_NODE(&export->exp_gen_hash);
1084 spin_lock_init(&export->exp_bl_list_lock);
1085 INIT_LIST_HEAD(&export->exp_bl_list);
1086 INIT_LIST_HEAD(&export->exp_stale_list);
1087 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1089 export->exp_sp_peer = LUSTRE_SP_ANY;
1090 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1091 export->exp_client_uuid = *cluuid;
1092 obd_init_export(export);
1094 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1096 spin_lock(&obd->obd_dev_lock);
1097 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1098 /* shouldn't happen, but might race */
1099 if (obd->obd_stopping)
1100 GOTO(exit_unlock, rc = -ENODEV);
1102 rc = obd_uuid_add(obd, export);
1104 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1105 obd->obd_name, cluuid->uuid, rc);
1106 GOTO(exit_unlock, rc = -EALREADY);
1111 class_incref(obd, "export", export);
1112 list_add_tail(&export->exp_obd_chain_timed,
1113 &obd->obd_exports_timed);
1114 list_add(&export->exp_obd_chain, &obd->obd_exports);
1115 obd->obd_num_exports++;
1117 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1118 INIT_LIST_HEAD(&export->exp_obd_chain);
1120 spin_unlock(&obd->obd_dev_lock);
1124 spin_unlock(&obd->obd_dev_lock);
1125 class_handle_unhash(&export->exp_handle);
1126 obd_destroy_export(export);
1127 OBD_FREE_PTR(export);
1131 struct obd_export *class_new_export(struct obd_device *obd,
1132 struct obd_uuid *uuid)
1134 return __class_new_export(obd, uuid, false);
1136 EXPORT_SYMBOL(class_new_export);
1138 struct obd_export *class_new_export_self(struct obd_device *obd,
1139 struct obd_uuid *uuid)
1141 return __class_new_export(obd, uuid, true);
1144 void class_unlink_export(struct obd_export *exp)
1146 class_handle_unhash(&exp->exp_handle);
1148 if (exp->exp_obd->obd_self_export == exp) {
1149 class_export_put(exp);
1153 spin_lock(&exp->exp_obd->obd_dev_lock);
1154 /* delete an uuid-export hashitem from hashtables */
1155 if (exp != exp->exp_obd->obd_self_export)
1156 obd_uuid_del(exp->exp_obd, exp);
1158 #ifdef HAVE_SERVER_SUPPORT
1159 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1160 struct tg_export_data *ted = &exp->exp_target_data;
1161 struct cfs_hash *hash;
1163 /* Because obd_gen_hash will not be released until
1164 * class_cleanup(), so hash should never be NULL here */
1165 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1166 LASSERT(hash != NULL);
1167 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1168 &exp->exp_gen_hash);
1169 cfs_hash_putref(hash);
1171 #endif /* HAVE_SERVER_SUPPORT */
1173 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1174 list_del_init(&exp->exp_obd_chain_timed);
1175 exp->exp_obd->obd_num_exports--;
1176 spin_unlock(&exp->exp_obd->obd_dev_lock);
1177 atomic_inc(&obd_stale_export_num);
1179 /* A reference is kept by obd_stale_exports list */
1180 obd_stale_export_put(exp);
1182 EXPORT_SYMBOL(class_unlink_export);
1184 /* Import management functions */
1185 static void obd_zombie_import_free(struct obd_import *imp)
1189 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1190 imp->imp_obd->obd_name);
1192 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1194 ptlrpc_put_connection_superhack(imp->imp_connection);
1196 while (!list_empty(&imp->imp_conn_list)) {
1197 struct obd_import_conn *imp_conn;
1199 imp_conn = list_entry(imp->imp_conn_list.next,
1200 struct obd_import_conn, oic_item);
1201 list_del_init(&imp_conn->oic_item);
1202 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1203 OBD_FREE(imp_conn, sizeof(*imp_conn));
1206 LASSERT(imp->imp_sec == NULL);
1207 class_decref(imp->imp_obd, "import", imp);
1212 struct obd_import *class_import_get(struct obd_import *import)
1214 atomic_inc(&import->imp_refcount);
1215 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1216 atomic_read(&import->imp_refcount),
1217 import->imp_obd->obd_name);
1220 EXPORT_SYMBOL(class_import_get);
1222 void class_import_put(struct obd_import *imp)
1226 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1228 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1229 atomic_read(&imp->imp_refcount) - 1,
1230 imp->imp_obd->obd_name);
1232 if (atomic_dec_and_test(&imp->imp_refcount)) {
1233 CDEBUG(D_INFO, "final put import %p\n", imp);
1234 obd_zombie_import_add(imp);
1239 EXPORT_SYMBOL(class_import_put);
1241 static void init_imp_at(struct imp_at *at) {
1243 at_init(&at->iat_net_latency, 0, 0);
1244 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1245 /* max service estimates are tracked on the server side, so
1246 don't use the AT history here, just use the last reported
1247 val. (But keep hist for proc histogram, worst_ever) */
1248 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1253 static void obd_zombie_imp_cull(struct work_struct *ws)
1255 struct obd_import *import;
1257 import = container_of(ws, struct obd_import, imp_zombie_work);
1258 obd_zombie_import_free(import);
1261 struct obd_import *class_new_import(struct obd_device *obd)
1263 struct obd_import *imp;
1264 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1266 OBD_ALLOC(imp, sizeof(*imp));
1270 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1271 INIT_LIST_HEAD(&imp->imp_replay_list);
1272 INIT_LIST_HEAD(&imp->imp_sending_list);
1273 INIT_LIST_HEAD(&imp->imp_delayed_list);
1274 INIT_LIST_HEAD(&imp->imp_committed_list);
1275 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1276 imp->imp_known_replied_xid = 0;
1277 imp->imp_replay_cursor = &imp->imp_committed_list;
1278 spin_lock_init(&imp->imp_lock);
1279 imp->imp_last_success_conn = 0;
1280 imp->imp_state = LUSTRE_IMP_NEW;
1281 imp->imp_obd = class_incref(obd, "import", imp);
1282 rwlock_init(&imp->imp_sec_lock);
1283 init_waitqueue_head(&imp->imp_recovery_waitq);
1284 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1286 if (curr_pid_ns->child_reaper)
1287 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1289 imp->imp_sec_refpid = 1;
1291 atomic_set(&imp->imp_refcount, 2);
1292 atomic_set(&imp->imp_unregistering, 0);
1293 atomic_set(&imp->imp_inflight, 0);
1294 atomic_set(&imp->imp_replay_inflight, 0);
1295 atomic_set(&imp->imp_inval_count, 0);
1296 INIT_LIST_HEAD(&imp->imp_conn_list);
1297 init_imp_at(&imp->imp_at);
1299 /* the default magic is V2, will be used in connect RPC, and
1300 * then adjusted according to the flags in request/reply. */
1301 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1305 EXPORT_SYMBOL(class_new_import);
1307 void class_destroy_import(struct obd_import *import)
1309 LASSERT(import != NULL);
1310 LASSERT(import != LP_POISON);
1312 spin_lock(&import->imp_lock);
1313 import->imp_generation++;
1314 spin_unlock(&import->imp_lock);
1315 class_import_put(import);
1317 EXPORT_SYMBOL(class_destroy_import);
1319 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1321 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1323 spin_lock(&exp->exp_locks_list_guard);
1325 LASSERT(lock->l_exp_refs_nr >= 0);
1327 if (lock->l_exp_refs_target != NULL &&
1328 lock->l_exp_refs_target != exp) {
1329 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1330 exp, lock, lock->l_exp_refs_target);
1332 if ((lock->l_exp_refs_nr ++) == 0) {
1333 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1334 lock->l_exp_refs_target = exp;
1336 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1337 lock, exp, lock->l_exp_refs_nr);
1338 spin_unlock(&exp->exp_locks_list_guard);
1340 EXPORT_SYMBOL(__class_export_add_lock_ref);
1342 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1344 spin_lock(&exp->exp_locks_list_guard);
1345 LASSERT(lock->l_exp_refs_nr > 0);
1346 if (lock->l_exp_refs_target != exp) {
1347 LCONSOLE_WARN("lock %p, "
1348 "mismatching export pointers: %p, %p\n",
1349 lock, lock->l_exp_refs_target, exp);
1351 if (-- lock->l_exp_refs_nr == 0) {
1352 list_del_init(&lock->l_exp_refs_link);
1353 lock->l_exp_refs_target = NULL;
1355 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1356 lock, exp, lock->l_exp_refs_nr);
1357 spin_unlock(&exp->exp_locks_list_guard);
1359 EXPORT_SYMBOL(__class_export_del_lock_ref);
1362 /* A connection defines an export context in which preallocation can
1363 be managed. This releases the export pointer reference, and returns
1364 the export handle, so the export refcount is 1 when this function
1366 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1367 struct obd_uuid *cluuid)
1369 struct obd_export *export;
1370 LASSERT(conn != NULL);
1371 LASSERT(obd != NULL);
1372 LASSERT(cluuid != NULL);
1375 export = class_new_export(obd, cluuid);
1377 RETURN(PTR_ERR(export));
1379 conn->cookie = export->exp_handle.h_cookie;
1380 class_export_put(export);
1382 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1383 cluuid->uuid, conn->cookie);
1386 EXPORT_SYMBOL(class_connect);
1388 /* if export is involved in recovery then clean up related things */
1389 static void class_export_recovery_cleanup(struct obd_export *exp)
1391 struct obd_device *obd = exp->exp_obd;
1393 spin_lock(&obd->obd_recovery_task_lock);
1394 if (obd->obd_recovering) {
1395 if (exp->exp_in_recovery) {
1396 spin_lock(&exp->exp_lock);
1397 exp->exp_in_recovery = 0;
1398 spin_unlock(&exp->exp_lock);
1399 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1400 atomic_dec(&obd->obd_connected_clients);
1403 /* if called during recovery then should update
1404 * obd_stale_clients counter,
1405 * lightweight exports are not counted */
1406 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1407 exp->exp_obd->obd_stale_clients++;
1409 spin_unlock(&obd->obd_recovery_task_lock);
1411 spin_lock(&exp->exp_lock);
1412 /** Cleanup req replay fields */
1413 if (exp->exp_req_replay_needed) {
1414 exp->exp_req_replay_needed = 0;
1416 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1417 atomic_dec(&obd->obd_req_replay_clients);
1420 /** Cleanup lock replay data */
1421 if (exp->exp_lock_replay_needed) {
1422 exp->exp_lock_replay_needed = 0;
1424 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1425 atomic_dec(&obd->obd_lock_replay_clients);
1427 spin_unlock(&exp->exp_lock);
1430 /* This function removes 1-3 references from the export:
1431 * 1 - for export pointer passed
1432 * and if disconnect really need
1433 * 2 - removing from hash
1434 * 3 - in client_unlink_export
1435 * The export pointer passed to this function can destroyed */
1436 int class_disconnect(struct obd_export *export)
1438 int already_disconnected;
1441 if (export == NULL) {
1442 CWARN("attempting to free NULL export %p\n", export);
1446 spin_lock(&export->exp_lock);
1447 already_disconnected = export->exp_disconnected;
1448 export->exp_disconnected = 1;
1449 /* We hold references of export for uuid hash
1450 * and nid_hash and export link at least. So
1451 * it is safe to call cfs_hash_del in there. */
1452 if (!hlist_unhashed(&export->exp_nid_hash))
1453 cfs_hash_del(export->exp_obd->obd_nid_hash,
1454 &export->exp_connection->c_peer.nid,
1455 &export->exp_nid_hash);
1456 spin_unlock(&export->exp_lock);
1458 /* class_cleanup(), abort_recovery(), and class_fail_export()
1459 * all end up in here, and if any of them race we shouldn't
1460 * call extra class_export_puts(). */
1461 if (already_disconnected) {
1462 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1463 GOTO(no_disconn, already_disconnected);
1466 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1467 export->exp_handle.h_cookie);
1469 class_export_recovery_cleanup(export);
1470 class_unlink_export(export);
1472 class_export_put(export);
1475 EXPORT_SYMBOL(class_disconnect);
1477 /* Return non-zero for a fully connected export */
1478 int class_connected_export(struct obd_export *exp)
1483 spin_lock(&exp->exp_lock);
1484 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1485 spin_unlock(&exp->exp_lock);
1489 EXPORT_SYMBOL(class_connected_export);
1491 static void class_disconnect_export_list(struct list_head *list,
1492 enum obd_option flags)
1495 struct obd_export *exp;
1498 /* It's possible that an export may disconnect itself, but
1499 * nothing else will be added to this list. */
1500 while (!list_empty(list)) {
1501 exp = list_entry(list->next, struct obd_export,
1503 /* need for safe call CDEBUG after obd_disconnect */
1504 class_export_get(exp);
1506 spin_lock(&exp->exp_lock);
1507 exp->exp_flags = flags;
1508 spin_unlock(&exp->exp_lock);
1510 if (obd_uuid_equals(&exp->exp_client_uuid,
1511 &exp->exp_obd->obd_uuid)) {
1513 "exp %p export uuid == obd uuid, don't discon\n",
1515 /* Need to delete this now so we don't end up pointing
1516 * to work_list later when this export is cleaned up. */
1517 list_del_init(&exp->exp_obd_chain);
1518 class_export_put(exp);
1522 class_export_get(exp);
1523 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1524 "last request at %lld\n",
1525 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1526 exp, exp->exp_last_request_time);
1527 /* release one export reference anyway */
1528 rc = obd_disconnect(exp);
1530 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1531 obd_export_nid2str(exp), exp, rc);
1532 class_export_put(exp);
1537 void class_disconnect_exports(struct obd_device *obd)
1539 struct list_head work_list;
1542 /* Move all of the exports from obd_exports to a work list, en masse. */
1543 INIT_LIST_HEAD(&work_list);
1544 spin_lock(&obd->obd_dev_lock);
1545 list_splice_init(&obd->obd_exports, &work_list);
1546 list_splice_init(&obd->obd_delayed_exports, &work_list);
1547 spin_unlock(&obd->obd_dev_lock);
1549 if (!list_empty(&work_list)) {
1550 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1551 "disconnecting them\n", obd->obd_minor, obd);
1552 class_disconnect_export_list(&work_list,
1553 exp_flags_from_obd(obd));
1555 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1556 obd->obd_minor, obd);
1559 EXPORT_SYMBOL(class_disconnect_exports);
1561 /* Remove exports that have not completed recovery.
1563 void class_disconnect_stale_exports(struct obd_device *obd,
1564 int (*test_export)(struct obd_export *))
1566 struct list_head work_list;
1567 struct obd_export *exp, *n;
1571 INIT_LIST_HEAD(&work_list);
1572 spin_lock(&obd->obd_dev_lock);
1573 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1575 /* don't count self-export as client */
1576 if (obd_uuid_equals(&exp->exp_client_uuid,
1577 &exp->exp_obd->obd_uuid))
1580 /* don't evict clients which have no slot in last_rcvd
1581 * (e.g. lightweight connection) */
1582 if (exp->exp_target_data.ted_lr_idx == -1)
1585 spin_lock(&exp->exp_lock);
1586 if (exp->exp_failed || test_export(exp)) {
1587 spin_unlock(&exp->exp_lock);
1590 exp->exp_failed = 1;
1591 spin_unlock(&exp->exp_lock);
1593 list_move(&exp->exp_obd_chain, &work_list);
1595 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1596 obd->obd_name, exp->exp_client_uuid.uuid,
1597 obd_export_nid2str(exp));
1598 print_export_data(exp, "EVICTING", 0, D_HA);
1600 spin_unlock(&obd->obd_dev_lock);
1603 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1604 obd->obd_name, evicted);
1606 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1607 OBD_OPT_ABORT_RECOV);
1610 EXPORT_SYMBOL(class_disconnect_stale_exports);
1612 void class_fail_export(struct obd_export *exp)
1614 int rc, already_failed;
1616 spin_lock(&exp->exp_lock);
1617 already_failed = exp->exp_failed;
1618 exp->exp_failed = 1;
1619 spin_unlock(&exp->exp_lock);
1621 if (already_failed) {
1622 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1623 exp, exp->exp_client_uuid.uuid);
1627 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1628 exp, exp->exp_client_uuid.uuid);
1630 if (obd_dump_on_timeout)
1631 libcfs_debug_dumplog();
1633 /* need for safe call CDEBUG after obd_disconnect */
1634 class_export_get(exp);
1636 /* Most callers into obd_disconnect are removing their own reference
1637 * (request, for example) in addition to the one from the hash table.
1638 * We don't have such a reference here, so make one. */
1639 class_export_get(exp);
1640 rc = obd_disconnect(exp);
1642 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1644 CDEBUG(D_HA, "disconnected export %p/%s\n",
1645 exp, exp->exp_client_uuid.uuid);
1646 class_export_put(exp);
1648 EXPORT_SYMBOL(class_fail_export);
1650 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1652 struct cfs_hash *nid_hash;
1653 struct obd_export *doomed_exp = NULL;
1654 int exports_evicted = 0;
1656 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1658 spin_lock(&obd->obd_dev_lock);
1659 /* umount has run already, so evict thread should leave
1660 * its task to umount thread now */
1661 if (obd->obd_stopping) {
1662 spin_unlock(&obd->obd_dev_lock);
1663 return exports_evicted;
1665 nid_hash = obd->obd_nid_hash;
1666 cfs_hash_getref(nid_hash);
1667 spin_unlock(&obd->obd_dev_lock);
1670 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1671 if (doomed_exp == NULL)
1674 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1675 "nid %s found, wanted nid %s, requested nid %s\n",
1676 obd_export_nid2str(doomed_exp),
1677 libcfs_nid2str(nid_key), nid);
1678 LASSERTF(doomed_exp != obd->obd_self_export,
1679 "self-export is hashed by NID?\n");
1681 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1682 "request\n", obd->obd_name,
1683 obd_uuid2str(&doomed_exp->exp_client_uuid),
1684 obd_export_nid2str(doomed_exp));
1685 class_fail_export(doomed_exp);
1686 class_export_put(doomed_exp);
1689 cfs_hash_putref(nid_hash);
1691 if (!exports_evicted)
1692 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1693 obd->obd_name, nid);
1694 return exports_evicted;
1696 EXPORT_SYMBOL(obd_export_evict_by_nid);
1698 #ifdef HAVE_SERVER_SUPPORT
1699 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1701 struct obd_export *doomed_exp = NULL;
1702 struct obd_uuid doomed_uuid;
1703 int exports_evicted = 0;
1705 spin_lock(&obd->obd_dev_lock);
1706 if (obd->obd_stopping) {
1707 spin_unlock(&obd->obd_dev_lock);
1708 return exports_evicted;
1710 spin_unlock(&obd->obd_dev_lock);
1712 obd_str2uuid(&doomed_uuid, uuid);
1713 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1714 CERROR("%s: can't evict myself\n", obd->obd_name);
1715 return exports_evicted;
1718 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1719 if (doomed_exp == NULL) {
1720 CERROR("%s: can't disconnect %s: no exports found\n",
1721 obd->obd_name, uuid);
1723 CWARN("%s: evicting %s at adminstrative request\n",
1724 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1725 class_fail_export(doomed_exp);
1726 class_export_put(doomed_exp);
1727 obd_uuid_del(obd, doomed_exp);
1731 return exports_evicted;
1733 #endif /* HAVE_SERVER_SUPPORT */
1735 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1736 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1737 EXPORT_SYMBOL(class_export_dump_hook);
1740 static void print_export_data(struct obd_export *exp, const char *status,
1741 int locks, int debug_level)
1743 struct ptlrpc_reply_state *rs;
1744 struct ptlrpc_reply_state *first_reply = NULL;
1747 spin_lock(&exp->exp_lock);
1748 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1754 spin_unlock(&exp->exp_lock);
1756 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1757 "%p %s %llu stale:%d\n",
1758 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1759 obd_export_nid2str(exp),
1760 refcount_read(&exp->exp_handle.h_ref),
1761 atomic_read(&exp->exp_rpc_count),
1762 atomic_read(&exp->exp_cb_count),
1763 atomic_read(&exp->exp_locks_count),
1764 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1765 nreplies, first_reply, nreplies > 3 ? "..." : "",
1766 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1767 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1768 if (locks && class_export_dump_hook != NULL)
1769 class_export_dump_hook(exp);
1773 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1775 struct obd_export *exp;
1777 spin_lock(&obd->obd_dev_lock);
1778 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1779 print_export_data(exp, "ACTIVE", locks, debug_level);
1780 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1781 print_export_data(exp, "UNLINKED", locks, debug_level);
1782 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1783 print_export_data(exp, "DELAYED", locks, debug_level);
1784 spin_unlock(&obd->obd_dev_lock);
1787 void obd_exports_barrier(struct obd_device *obd)
1790 LASSERT(list_empty(&obd->obd_exports));
1791 spin_lock(&obd->obd_dev_lock);
1792 while (!list_empty(&obd->obd_unlinked_exports)) {
1793 spin_unlock(&obd->obd_dev_lock);
1794 set_current_state(TASK_UNINTERRUPTIBLE);
1795 schedule_timeout(cfs_time_seconds(waited));
1796 if (waited > 5 && is_power_of_2(waited)) {
1797 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1798 "more than %d seconds. "
1799 "The obd refcount = %d. Is it stuck?\n",
1800 obd->obd_name, waited,
1801 atomic_read(&obd->obd_refcount));
1802 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1805 spin_lock(&obd->obd_dev_lock);
1807 spin_unlock(&obd->obd_dev_lock);
1809 EXPORT_SYMBOL(obd_exports_barrier);
1812 * Add export to the obd_zombe thread and notify it.
1814 static void obd_zombie_export_add(struct obd_export *exp) {
1815 atomic_dec(&obd_stale_export_num);
1816 spin_lock(&exp->exp_obd->obd_dev_lock);
1817 LASSERT(!list_empty(&exp->exp_obd_chain));
1818 list_del_init(&exp->exp_obd_chain);
1819 spin_unlock(&exp->exp_obd->obd_dev_lock);
1821 queue_work(zombie_wq, &exp->exp_zombie_work);
1825 * Add import to the obd_zombe thread and notify it.
1827 static void obd_zombie_import_add(struct obd_import *imp) {
1828 LASSERT(imp->imp_sec == NULL);
1830 queue_work(zombie_wq, &imp->imp_zombie_work);
1834 * wait when obd_zombie import/export queues become empty
1836 void obd_zombie_barrier(void)
1838 flush_workqueue(zombie_wq);
1840 EXPORT_SYMBOL(obd_zombie_barrier);
1843 struct obd_export *obd_stale_export_get(void)
1845 struct obd_export *exp = NULL;
1848 spin_lock(&obd_stale_export_lock);
1849 if (!list_empty(&obd_stale_exports)) {
1850 exp = list_entry(obd_stale_exports.next,
1851 struct obd_export, exp_stale_list);
1852 list_del_init(&exp->exp_stale_list);
1854 spin_unlock(&obd_stale_export_lock);
1857 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1858 atomic_read(&obd_stale_export_num));
1862 EXPORT_SYMBOL(obd_stale_export_get);
1864 void obd_stale_export_put(struct obd_export *exp)
1868 LASSERT(list_empty(&exp->exp_stale_list));
1869 if (exp->exp_lock_hash &&
1870 atomic_read(&exp->exp_lock_hash->hs_count)) {
1871 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1872 atomic_read(&obd_stale_export_num));
1874 spin_lock_bh(&exp->exp_bl_list_lock);
1875 spin_lock(&obd_stale_export_lock);
1876 /* Add to the tail if there is no blocked locks,
1877 * to the head otherwise. */
1878 if (list_empty(&exp->exp_bl_list))
1879 list_add_tail(&exp->exp_stale_list,
1880 &obd_stale_exports);
1882 list_add(&exp->exp_stale_list,
1883 &obd_stale_exports);
1885 spin_unlock(&obd_stale_export_lock);
1886 spin_unlock_bh(&exp->exp_bl_list_lock);
1888 class_export_put(exp);
1892 EXPORT_SYMBOL(obd_stale_export_put);
1895 * Adjust the position of the export in the stale list,
1896 * i.e. move to the head of the list if is needed.
1898 void obd_stale_export_adjust(struct obd_export *exp)
1900 LASSERT(exp != NULL);
1901 spin_lock_bh(&exp->exp_bl_list_lock);
1902 spin_lock(&obd_stale_export_lock);
1904 if (!list_empty(&exp->exp_stale_list) &&
1905 !list_empty(&exp->exp_bl_list))
1906 list_move(&exp->exp_stale_list, &obd_stale_exports);
1908 spin_unlock(&obd_stale_export_lock);
1909 spin_unlock_bh(&exp->exp_bl_list_lock);
1911 EXPORT_SYMBOL(obd_stale_export_adjust);
1914 * start destroy zombie import/export thread
1916 int obd_zombie_impexp_init(void)
1918 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1926 * stop destroy zombie import/export thread
1928 void obd_zombie_impexp_stop(void)
1930 destroy_workqueue(zombie_wq);
1931 LASSERT(list_empty(&obd_stale_exports));
1934 /***** Kernel-userspace comm helpers *******/
1936 /* Get length of entire message, including header */
1937 int kuc_len(int payload_len)
1939 return sizeof(struct kuc_hdr) + payload_len;
1941 EXPORT_SYMBOL(kuc_len);
1943 /* Get a pointer to kuc header, given a ptr to the payload
1944 * @param p Pointer to payload area
1945 * @returns Pointer to kuc header
1947 struct kuc_hdr * kuc_ptr(void *p)
1949 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1950 LASSERT(lh->kuc_magic == KUC_MAGIC);
1953 EXPORT_SYMBOL(kuc_ptr);
1955 /* Alloc space for a message, and fill in header
1956 * @return Pointer to payload area
1958 void *kuc_alloc(int payload_len, int transport, int type)
1961 int len = kuc_len(payload_len);
1965 return ERR_PTR(-ENOMEM);
1967 lh->kuc_magic = KUC_MAGIC;
1968 lh->kuc_transport = transport;
1969 lh->kuc_msgtype = type;
1970 lh->kuc_msglen = len;
1972 return (void *)(lh + 1);
1974 EXPORT_SYMBOL(kuc_alloc);
1976 /* Takes pointer to payload area */
1977 void kuc_free(void *p, int payload_len)
1979 struct kuc_hdr *lh = kuc_ptr(p);
1980 OBD_FREE(lh, kuc_len(payload_len));
1982 EXPORT_SYMBOL(kuc_free);
1984 struct obd_request_slot_waiter {
1985 struct list_head orsw_entry;
1986 wait_queue_head_t orsw_waitq;
1990 static bool obd_request_slot_avail(struct client_obd *cli,
1991 struct obd_request_slot_waiter *orsw)
1995 spin_lock(&cli->cl_loi_list_lock);
1996 avail = !!list_empty(&orsw->orsw_entry);
1997 spin_unlock(&cli->cl_loi_list_lock);
2003 * For network flow control, the RPC sponsor needs to acquire a credit
2004 * before sending the RPC. The credits count for a connection is defined
2005 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2006 * the subsequent RPC sponsors need to wait until others released their
2007 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2009 int obd_get_request_slot(struct client_obd *cli)
2011 struct obd_request_slot_waiter orsw;
2012 struct l_wait_info lwi;
2015 spin_lock(&cli->cl_loi_list_lock);
2016 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2017 cli->cl_rpcs_in_flight++;
2018 spin_unlock(&cli->cl_loi_list_lock);
2022 init_waitqueue_head(&orsw.orsw_waitq);
2023 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2024 orsw.orsw_signaled = false;
2025 spin_unlock(&cli->cl_loi_list_lock);
2027 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2028 rc = l_wait_event(orsw.orsw_waitq,
2029 obd_request_slot_avail(cli, &orsw) ||
2033 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2034 * freed but other (such as obd_put_request_slot) is using it. */
2035 spin_lock(&cli->cl_loi_list_lock);
2037 if (!orsw.orsw_signaled) {
2038 if (list_empty(&orsw.orsw_entry))
2039 cli->cl_rpcs_in_flight--;
2041 list_del(&orsw.orsw_entry);
2045 if (orsw.orsw_signaled) {
2046 LASSERT(list_empty(&orsw.orsw_entry));
2050 spin_unlock(&cli->cl_loi_list_lock);
2054 EXPORT_SYMBOL(obd_get_request_slot);
2056 void obd_put_request_slot(struct client_obd *cli)
2058 struct obd_request_slot_waiter *orsw;
2060 spin_lock(&cli->cl_loi_list_lock);
2061 cli->cl_rpcs_in_flight--;
2063 /* If there is free slot, wakeup the first waiter. */
2064 if (!list_empty(&cli->cl_flight_waiters) &&
2065 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2066 orsw = list_entry(cli->cl_flight_waiters.next,
2067 struct obd_request_slot_waiter, orsw_entry);
2068 list_del_init(&orsw->orsw_entry);
2069 cli->cl_rpcs_in_flight++;
2070 wake_up(&orsw->orsw_waitq);
2072 spin_unlock(&cli->cl_loi_list_lock);
2074 EXPORT_SYMBOL(obd_put_request_slot);
2076 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2078 return cli->cl_max_rpcs_in_flight;
2080 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2082 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2084 struct obd_request_slot_waiter *orsw;
2088 const char *type_name;
2091 if (max > OBD_MAX_RIF_MAX || max < 1)
2094 type_name = cli->cl_import->imp_obd->obd_type->typ_name;
2095 if (strcmp(type_name, LUSTRE_MDC_NAME) == 0) {
2096 /* adjust max_mod_rpcs_in_flight to ensure it is always
2097 * strictly lower that max_rpcs_in_flight */
2099 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2100 "because it must be higher than "
2101 "max_mod_rpcs_in_flight value",
2102 cli->cl_import->imp_obd->obd_name);
2105 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2106 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2112 spin_lock(&cli->cl_loi_list_lock);
2113 old = cli->cl_max_rpcs_in_flight;
2114 cli->cl_max_rpcs_in_flight = max;
2115 client_adjust_max_dirty(cli);
2119 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2120 for (i = 0; i < diff; i++) {
2121 if (list_empty(&cli->cl_flight_waiters))
2124 orsw = list_entry(cli->cl_flight_waiters.next,
2125 struct obd_request_slot_waiter, orsw_entry);
2126 list_del_init(&orsw->orsw_entry);
2127 cli->cl_rpcs_in_flight++;
2128 wake_up(&orsw->orsw_waitq);
2130 spin_unlock(&cli->cl_loi_list_lock);
2134 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2136 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2138 return cli->cl_max_mod_rpcs_in_flight;
2140 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2142 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2144 struct obd_connect_data *ocd;
2148 if (max > OBD_MAX_RIF_MAX || max < 1)
2151 /* cannot exceed or equal max_rpcs_in_flight */
2152 if (max >= cli->cl_max_rpcs_in_flight) {
2153 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2154 "higher or equal to max_rpcs_in_flight value (%u)\n",
2155 cli->cl_import->imp_obd->obd_name,
2156 max, cli->cl_max_rpcs_in_flight);
2160 /* cannot exceed max modify RPCs in flight supported by the server */
2161 ocd = &cli->cl_import->imp_connect_data;
2162 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2163 maxmodrpcs = ocd->ocd_maxmodrpcs;
2166 if (max > maxmodrpcs) {
2167 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2168 "higher than max_mod_rpcs_per_client value (%hu) "
2169 "returned by the server at connection\n",
2170 cli->cl_import->imp_obd->obd_name,
2175 spin_lock(&cli->cl_mod_rpcs_lock);
2177 prev = cli->cl_max_mod_rpcs_in_flight;
2178 cli->cl_max_mod_rpcs_in_flight = max;
2180 /* wakeup waiters if limit has been increased */
2181 if (cli->cl_max_mod_rpcs_in_flight > prev)
2182 wake_up(&cli->cl_mod_rpcs_waitq);
2184 spin_unlock(&cli->cl_mod_rpcs_lock);
2188 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2190 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2191 struct seq_file *seq)
2193 unsigned long mod_tot = 0, mod_cum;
2194 struct timespec64 now;
2197 ktime_get_real_ts64(&now);
2199 spin_lock(&cli->cl_mod_rpcs_lock);
2201 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2202 (s64)now.tv_sec, now.tv_nsec);
2203 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2204 cli->cl_mod_rpcs_in_flight);
2206 seq_printf(seq, "\n\t\t\tmodify\n");
2207 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2209 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2212 for (i = 0; i < OBD_HIST_MAX; i++) {
2213 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2215 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2216 i, mod, pct(mod, mod_tot),
2217 pct(mod_cum, mod_tot));
2218 if (mod_cum == mod_tot)
2222 spin_unlock(&cli->cl_mod_rpcs_lock);
2226 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2228 /* The number of modify RPCs sent in parallel is limited
2229 * because the server has a finite number of slots per client to
2230 * store request result and ensure reply reconstruction when needed.
2231 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2232 * that takes into account server limit and cl_max_rpcs_in_flight
2234 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2235 * one close request is allowed above the maximum.
2237 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2242 /* A slot is available if
2243 * - number of modify RPCs in flight is less than the max
2244 * - it's a close RPC and no other close request is in flight
2246 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2247 (close_req && cli->cl_close_rpcs_in_flight == 0);
2252 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2257 spin_lock(&cli->cl_mod_rpcs_lock);
2258 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2259 spin_unlock(&cli->cl_mod_rpcs_lock);
2264 /* Get a modify RPC slot from the obd client @cli according
2265 * to the kind of operation @opc that is going to be sent
2266 * and the intent @it of the operation if it applies.
2267 * If the maximum number of modify RPCs in flight is reached
2268 * the thread is put to sleep.
2269 * Returns the tag to be set in the request message. Tag 0
2270 * is reserved for non-modifying requests.
2272 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2274 bool close_req = false;
2277 if (opc == MDS_CLOSE)
2281 spin_lock(&cli->cl_mod_rpcs_lock);
2282 max = cli->cl_max_mod_rpcs_in_flight;
2283 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2284 /* there is a slot available */
2285 cli->cl_mod_rpcs_in_flight++;
2287 cli->cl_close_rpcs_in_flight++;
2288 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2289 cli->cl_mod_rpcs_in_flight);
2290 /* find a free tag */
2291 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2293 LASSERT(i < OBD_MAX_RIF_MAX);
2294 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2295 spin_unlock(&cli->cl_mod_rpcs_lock);
2296 /* tag 0 is reserved for non-modify RPCs */
2298 CDEBUG(D_RPCTRACE, "%s: modify RPC slot %u is allocated"
2299 "opc %u, max %hu\n",
2300 cli->cl_import->imp_obd->obd_name,
2305 spin_unlock(&cli->cl_mod_rpcs_lock);
2307 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2308 "opc %u, max %hu\n",
2309 cli->cl_import->imp_obd->obd_name, opc, max);
2311 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2312 obd_mod_rpc_slot_avail(cli,
2316 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2318 /* Put a modify RPC slot from the obd client @cli according
2319 * to the kind of operation @opc that has been sent.
2321 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2323 bool close_req = false;
2328 if (opc == MDS_CLOSE)
2331 spin_lock(&cli->cl_mod_rpcs_lock);
2332 cli->cl_mod_rpcs_in_flight--;
2334 cli->cl_close_rpcs_in_flight--;
2335 /* release the tag in the bitmap */
2336 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2337 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2338 spin_unlock(&cli->cl_mod_rpcs_lock);
2339 wake_up(&cli->cl_mod_rpcs_waitq);
2341 EXPORT_SYMBOL(obd_put_mod_rpc_slot);