4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/obdclass/genops.c
33 * These are the only exported functions, they provide some generic
34 * infrastructure for managing object devices
37 #define DEBUG_SUBSYSTEM S_CLASS
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
48 DEFINE_RWLOCK(obd_dev_lock);
49 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51 static struct kmem_cache *obd_device_cachep;
52 static struct kobj_type class_ktype;
53 static struct workqueue_struct *zombie_wq;
55 static void obd_zombie_export_add(struct obd_export *exp);
56 static void obd_zombie_import_add(struct obd_import *imp);
57 static void print_export_data(struct obd_export *exp,
58 const char *status, int locks, int debug_level);
60 static LIST_HEAD(obd_stale_exports);
61 static DEFINE_SPINLOCK(obd_stale_export_lock);
62 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 * support functions: we could use inter-module communication, but this
66 * is more portable to other OS's
68 static struct obd_device *obd_device_alloc(void)
70 struct obd_device *obd;
72 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74 obd->obd_magic = OBD_DEVICE_MAGIC;
79 static void obd_device_free(struct obd_device *obd)
82 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84 if (obd->obd_namespace != NULL) {
85 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86 obd, obd->obd_namespace, obd->obd_force);
89 lu_ref_fini(&obd->obd_reference);
90 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 struct obd_type *class_search_type(const char *name)
95 struct kobject *kobj = kset_find_obj(lustre_kset, name);
97 if (kobj && kobj->ktype == &class_ktype)
98 return container_of(kobj, struct obd_type, typ_kobj);
103 EXPORT_SYMBOL(class_search_type);
105 struct obd_type *class_get_type(const char *name)
107 struct obd_type *type;
109 type = class_search_type(name);
110 #ifdef HAVE_MODULE_LOADING_SUPPORT
112 const char *modname = name;
114 #ifdef HAVE_SERVER_SUPPORT
115 if (strcmp(modname, "obdfilter") == 0)
118 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119 modname = LUSTRE_OSP_NAME;
121 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122 modname = LUSTRE_MDT_NAME;
123 #endif /* HAVE_SERVER_SUPPORT */
125 if (!request_module("%s", modname)) {
126 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127 type = class_search_type(name);
129 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135 if (try_module_get(type->typ_dt_ops->o_owner)) {
136 atomic_inc(&type->typ_refcnt);
137 /* class_search_type() returned a counted reference,
138 * but we don't need that count any more as
139 * we have one through typ_refcnt.
141 kobject_put(&type->typ_kobj);
143 kobject_put(&type->typ_kobj);
149 EXPORT_SYMBOL(class_get_type);
151 void class_put_type(struct obd_type *type)
154 module_put(type->typ_dt_ops->o_owner);
155 atomic_dec(&type->typ_refcnt);
157 EXPORT_SYMBOL(class_put_type);
159 static void class_sysfs_release(struct kobject *kobj)
161 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
163 debugfs_remove_recursive(type->typ_debugfs_entry);
164 type->typ_debugfs_entry = NULL;
167 lu_device_type_fini(type->typ_lu);
169 #ifdef CONFIG_PROC_FS
170 if (type->typ_name && type->typ_procroot)
171 remove_proc_subtree(type->typ_name, proc_lustre_root);
173 OBD_FREE(type, sizeof(*type));
176 static struct kobj_type class_ktype = {
177 .sysfs_ops = &lustre_sysfs_ops,
178 .release = class_sysfs_release,
181 #ifdef HAVE_SERVER_SUPPORT
182 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
184 struct dentry *symlink;
185 struct obd_type *type;
188 type = class_search_type(name);
190 kobject_put(&type->typ_kobj);
191 return ERR_PTR(-EEXIST);
194 OBD_ALLOC(type, sizeof(*type));
196 return ERR_PTR(-ENOMEM);
198 type->typ_kobj.kset = lustre_kset;
199 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
200 &lustre_kset->kobj, "%s", name);
204 symlink = debugfs_create_dir(name, debugfs_lustre_root);
205 type->typ_debugfs_entry = symlink;
206 type->typ_sym_filter = true;
209 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
211 if (IS_ERR(type->typ_procroot)) {
212 CERROR("%s: can't create compat proc entry: %d\n",
213 name, (int)PTR_ERR(type->typ_procroot));
214 type->typ_procroot = NULL;
220 EXPORT_SYMBOL(class_add_symlinks);
221 #endif /* HAVE_SERVER_SUPPORT */
223 #define CLASS_MAX_NAME 1024
225 int class_register_type(const struct obd_ops *dt_ops,
226 const struct md_ops *md_ops,
228 const char *name, struct lu_device_type *ldt)
230 struct obd_type *type;
235 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
237 type = class_search_type(name);
239 #ifdef HAVE_SERVER_SUPPORT
240 if (type->typ_sym_filter)
242 #endif /* HAVE_SERVER_SUPPORT */
243 kobject_put(&type->typ_kobj);
244 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
248 OBD_ALLOC(type, sizeof(*type));
252 type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
253 type->typ_kobj.kset = lustre_kset;
254 kobject_init(&type->typ_kobj, &class_ktype);
255 #ifdef HAVE_SERVER_SUPPORT
257 #endif /* HAVE_SERVER_SUPPORT */
259 type->typ_dt_ops = dt_ops;
260 type->typ_md_ops = md_ops;
262 #ifdef HAVE_SERVER_SUPPORT
263 if (type->typ_sym_filter) {
264 type->typ_sym_filter = false;
265 kobject_put(&type->typ_kobj);
269 #ifdef CONFIG_PROC_FS
270 if (enable_proc && !type->typ_procroot) {
271 type->typ_procroot = lprocfs_register(name,
274 if (IS_ERR(type->typ_procroot)) {
275 rc = PTR_ERR(type->typ_procroot);
276 type->typ_procroot = NULL;
281 type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
283 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
286 #ifdef HAVE_SERVER_SUPPORT
290 rc = lu_device_type_init(ldt);
291 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
292 wake_up_var(&type->typ_lu);
300 kobject_put(&type->typ_kobj);
304 EXPORT_SYMBOL(class_register_type);
306 int class_unregister_type(const char *name)
308 struct obd_type *type = class_search_type(name);
313 CERROR("unknown obd type\n");
317 if (atomic_read(&type->typ_refcnt)) {
318 CERROR("type %s has refcount (%d)\n", name,
319 atomic_read(&type->typ_refcnt));
320 /* This is a bad situation, let's make the best of it */
321 /* Remove ops, but leave the name for debugging */
322 type->typ_dt_ops = NULL;
323 type->typ_md_ops = NULL;
324 GOTO(out_put, rc = -EBUSY);
327 /* Put the final ref */
328 kobject_put(&type->typ_kobj);
330 /* Put the ref returned by class_search_type() */
331 kobject_put(&type->typ_kobj);
334 } /* class_unregister_type */
335 EXPORT_SYMBOL(class_unregister_type);
338 * Create a new obd device.
340 * Allocate the new obd_device and initialize it.
342 * \param[in] type_name obd device type string.
343 * \param[in] name obd device name.
344 * \param[in] uuid obd device UUID
346 * \retval newdev pointer to created obd_device
347 * \retval ERR_PTR(errno) on error
349 struct obd_device *class_newdev(const char *type_name, const char *name,
352 struct obd_device *newdev;
353 struct obd_type *type = NULL;
356 if (strlen(name) >= MAX_OBD_NAME) {
357 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
358 RETURN(ERR_PTR(-EINVAL));
361 type = class_get_type(type_name);
363 CERROR("OBD: unknown type: %s\n", type_name);
364 RETURN(ERR_PTR(-ENODEV));
367 newdev = obd_device_alloc();
368 if (newdev == NULL) {
369 class_put_type(type);
370 RETURN(ERR_PTR(-ENOMEM));
372 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
373 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
374 newdev->obd_type = type;
375 newdev->obd_minor = -1;
377 rwlock_init(&newdev->obd_pool_lock);
378 newdev->obd_pool_limit = 0;
379 newdev->obd_pool_slv = 0;
381 INIT_LIST_HEAD(&newdev->obd_exports);
382 newdev->obd_num_exports = 0;
383 newdev->obd_grant_check_threshold = 100;
384 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
385 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
386 INIT_LIST_HEAD(&newdev->obd_exports_timed);
387 INIT_LIST_HEAD(&newdev->obd_nid_stats);
388 spin_lock_init(&newdev->obd_nid_lock);
389 spin_lock_init(&newdev->obd_dev_lock);
390 mutex_init(&newdev->obd_dev_mutex);
391 spin_lock_init(&newdev->obd_osfs_lock);
392 /* newdev->obd_osfs_age must be set to a value in the distant
393 * past to guarantee a fresh statfs is fetched on mount. */
394 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
396 /* XXX belongs in setup not attach */
397 init_rwsem(&newdev->obd_observer_link_sem);
399 spin_lock_init(&newdev->obd_recovery_task_lock);
400 init_waitqueue_head(&newdev->obd_next_transno_waitq);
401 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
402 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
403 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
404 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
405 INIT_LIST_HEAD(&newdev->obd_evict_list);
406 INIT_LIST_HEAD(&newdev->obd_lwp_list);
408 llog_group_init(&newdev->obd_olg);
409 /* Detach drops this */
410 atomic_set(&newdev->obd_refcount, 1);
411 lu_ref_init(&newdev->obd_reference);
412 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
414 newdev->obd_conn_inprogress = 0;
416 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
418 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
419 newdev->obd_name, newdev);
427 * \param[in] obd obd_device to be freed
431 void class_free_dev(struct obd_device *obd)
433 struct obd_type *obd_type = obd->obd_type;
435 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
436 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
437 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
438 "obd %p != obd_devs[%d] %p\n",
439 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
440 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
441 "obd_refcount should be 0, not %d\n",
442 atomic_read(&obd->obd_refcount));
443 LASSERT(obd_type != NULL);
445 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
446 obd->obd_name, obd->obd_type->typ_name);
448 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
449 obd->obd_name, obd->obd_uuid.uuid);
450 if (obd->obd_stopping) {
453 /* If we're not stopping, we were never set up */
454 err = obd_cleanup(obd);
456 CERROR("Cleanup %s returned %d\n",
460 obd_device_free(obd);
462 class_put_type(obd_type);
466 * Unregister obd device.
468 * Free slot in obd_dev[] used by \a obd.
470 * \param[in] new_obd obd_device to be unregistered
474 void class_unregister_device(struct obd_device *obd)
476 write_lock(&obd_dev_lock);
477 if (obd->obd_minor >= 0) {
478 LASSERT(obd_devs[obd->obd_minor] == obd);
479 obd_devs[obd->obd_minor] = NULL;
482 write_unlock(&obd_dev_lock);
486 * Register obd device.
488 * Find free slot in obd_devs[], fills it with \a new_obd.
490 * \param[in] new_obd obd_device to be registered
493 * \retval -EEXIST device with this name is registered
494 * \retval -EOVERFLOW obd_devs[] is full
496 int class_register_device(struct obd_device *new_obd)
500 int new_obd_minor = 0;
501 bool minor_assign = false;
502 bool retried = false;
505 write_lock(&obd_dev_lock);
506 for (i = 0; i < class_devno_max(); i++) {
507 struct obd_device *obd = class_num2obd(i);
510 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
513 write_unlock(&obd_dev_lock);
515 /* the obd_device could be waited to be
516 * destroyed by the "obd_zombie_impexp_thread".
518 obd_zombie_barrier();
523 CERROR("%s: already exists, won't add\n",
525 /* in case we found a free slot before duplicate */
526 minor_assign = false;
530 if (!minor_assign && obd == NULL) {
537 new_obd->obd_minor = new_obd_minor;
538 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
539 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
540 obd_devs[new_obd_minor] = new_obd;
544 CERROR("%s: all %u/%u devices used, increase "
545 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
546 i, class_devno_max(), ret);
549 write_unlock(&obd_dev_lock);
554 static int class_name2dev_nolock(const char *name)
561 for (i = 0; i < class_devno_max(); i++) {
562 struct obd_device *obd = class_num2obd(i);
564 if (obd && strcmp(name, obd->obd_name) == 0) {
565 /* Make sure we finished attaching before we give
566 out any references */
567 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
568 if (obd->obd_attached) {
578 int class_name2dev(const char *name)
585 read_lock(&obd_dev_lock);
586 i = class_name2dev_nolock(name);
587 read_unlock(&obd_dev_lock);
591 EXPORT_SYMBOL(class_name2dev);
593 struct obd_device *class_name2obd(const char *name)
595 int dev = class_name2dev(name);
597 if (dev < 0 || dev > class_devno_max())
599 return class_num2obd(dev);
601 EXPORT_SYMBOL(class_name2obd);
603 int class_uuid2dev_nolock(struct obd_uuid *uuid)
607 for (i = 0; i < class_devno_max(); i++) {
608 struct obd_device *obd = class_num2obd(i);
610 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
611 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
619 int class_uuid2dev(struct obd_uuid *uuid)
623 read_lock(&obd_dev_lock);
624 i = class_uuid2dev_nolock(uuid);
625 read_unlock(&obd_dev_lock);
629 EXPORT_SYMBOL(class_uuid2dev);
631 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
633 int dev = class_uuid2dev(uuid);
636 return class_num2obd(dev);
638 EXPORT_SYMBOL(class_uuid2obd);
641 * Get obd device from ::obd_devs[]
643 * \param num [in] array index
645 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
646 * otherwise return the obd device there.
648 struct obd_device *class_num2obd(int num)
650 struct obd_device *obd = NULL;
652 if (num < class_devno_max()) {
657 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
658 "%p obd_magic %08x != %08x\n",
659 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
660 LASSERTF(obd->obd_minor == num,
661 "%p obd_minor %0d != %0d\n",
662 obd, obd->obd_minor, num);
667 EXPORT_SYMBOL(class_num2obd);
670 * Find obd in obd_dev[] by name or uuid.
672 * Increment obd's refcount if found.
674 * \param[in] str obd name or uuid
676 * \retval NULL if not found
677 * \retval target pointer to found obd_device
679 struct obd_device *class_dev_by_str(const char *str)
681 struct obd_device *target = NULL;
682 struct obd_uuid tgtuuid;
685 obd_str2uuid(&tgtuuid, str);
687 read_lock(&obd_dev_lock);
688 rc = class_uuid2dev_nolock(&tgtuuid);
690 rc = class_name2dev_nolock(str);
693 target = class_num2obd(rc);
696 class_incref(target, "find", current);
697 read_unlock(&obd_dev_lock);
701 EXPORT_SYMBOL(class_dev_by_str);
704 * Get obd devices count. Device in any
706 * \retval obd device count
708 int get_devices_count(void)
710 int index, max_index = class_devno_max(), dev_count = 0;
712 read_lock(&obd_dev_lock);
713 for (index = 0; index <= max_index; index++) {
714 struct obd_device *obd = class_num2obd(index);
718 read_unlock(&obd_dev_lock);
722 EXPORT_SYMBOL(get_devices_count);
724 void class_obd_list(void)
729 read_lock(&obd_dev_lock);
730 for (i = 0; i < class_devno_max(); i++) {
731 struct obd_device *obd = class_num2obd(i);
735 if (obd->obd_stopping)
737 else if (obd->obd_set_up)
739 else if (obd->obd_attached)
743 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
744 i, status, obd->obd_type->typ_name,
745 obd->obd_name, obd->obd_uuid.uuid,
746 atomic_read(&obd->obd_refcount));
748 read_unlock(&obd_dev_lock);
751 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
752 * specified, then only the client with that uuid is returned,
753 * otherwise any client connected to the tgt is returned.
755 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
756 const char *type_name,
757 struct obd_uuid *grp_uuid)
761 read_lock(&obd_dev_lock);
762 for (i = 0; i < class_devno_max(); i++) {
763 struct obd_device *obd = class_num2obd(i);
767 if ((strncmp(obd->obd_type->typ_name, type_name,
768 strlen(type_name)) == 0)) {
769 if (obd_uuid_equals(tgt_uuid,
770 &obd->u.cli.cl_target_uuid) &&
771 ((grp_uuid)? obd_uuid_equals(grp_uuid,
772 &obd->obd_uuid) : 1)) {
773 read_unlock(&obd_dev_lock);
778 read_unlock(&obd_dev_lock);
782 EXPORT_SYMBOL(class_find_client_obd);
784 /* Iterate the obd_device list looking devices have grp_uuid. Start
785 * searching at *next, and if a device is found, the next index to look
786 * at is saved in *next. If next is NULL, then the first matching device
787 * will always be returned.
789 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
795 else if (*next >= 0 && *next < class_devno_max())
800 read_lock(&obd_dev_lock);
801 for (; i < class_devno_max(); i++) {
802 struct obd_device *obd = class_num2obd(i);
806 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
809 read_unlock(&obd_dev_lock);
813 read_unlock(&obd_dev_lock);
817 EXPORT_SYMBOL(class_devices_in_group);
820 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
821 * adjust sptlrpc settings accordingly.
823 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
825 struct obd_device *obd;
829 LASSERT(namelen > 0);
831 read_lock(&obd_dev_lock);
832 for (i = 0; i < class_devno_max(); i++) {
833 obd = class_num2obd(i);
835 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
838 /* only notify mdc, osc, osp, lwp, mdt, ost
839 * because only these have a -sptlrpc llog */
840 type = obd->obd_type->typ_name;
841 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
842 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
843 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
844 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
845 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
846 strcmp(type, LUSTRE_OST_NAME) != 0)
849 if (strncmp(obd->obd_name, fsname, namelen))
852 class_incref(obd, __FUNCTION__, obd);
853 read_unlock(&obd_dev_lock);
854 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
855 sizeof(KEY_SPTLRPC_CONF),
856 KEY_SPTLRPC_CONF, 0, NULL, NULL);
858 class_decref(obd, __FUNCTION__, obd);
859 read_lock(&obd_dev_lock);
861 read_unlock(&obd_dev_lock);
864 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
866 void obd_cleanup_caches(void)
869 if (obd_device_cachep) {
870 kmem_cache_destroy(obd_device_cachep);
871 obd_device_cachep = NULL;
877 int obd_init_caches(void)
882 LASSERT(obd_device_cachep == NULL);
883 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
884 sizeof(struct obd_device),
885 0, 0, 0, sizeof(struct obd_device), NULL);
886 if (!obd_device_cachep)
887 GOTO(out, rc = -ENOMEM);
891 obd_cleanup_caches();
895 static const char export_handle_owner[] = "export";
897 /* map connection to client */
898 struct obd_export *class_conn2export(struct lustre_handle *conn)
900 struct obd_export *export;
904 CDEBUG(D_CACHE, "looking for null handle\n");
908 if (conn->cookie == -1) { /* this means assign a new connection */
909 CDEBUG(D_CACHE, "want a new connection\n");
913 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
914 export = class_handle2object(conn->cookie, export_handle_owner);
917 EXPORT_SYMBOL(class_conn2export);
919 struct obd_device *class_exp2obd(struct obd_export *exp)
925 EXPORT_SYMBOL(class_exp2obd);
927 struct obd_import *class_exp2cliimp(struct obd_export *exp)
929 struct obd_device *obd = exp->exp_obd;
932 return obd->u.cli.cl_import;
934 EXPORT_SYMBOL(class_exp2cliimp);
936 /* Export management functions */
937 static void class_export_destroy(struct obd_export *exp)
939 struct obd_device *obd = exp->exp_obd;
942 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
943 LASSERT(obd != NULL);
945 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
946 exp->exp_client_uuid.uuid, obd->obd_name);
948 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
949 ptlrpc_connection_put(exp->exp_connection);
951 LASSERT(list_empty(&exp->exp_outstanding_replies));
952 LASSERT(list_empty(&exp->exp_uncommitted_replies));
953 LASSERT(list_empty(&exp->exp_req_replay_queue));
954 LASSERT(list_empty(&exp->exp_hp_rpcs));
955 obd_destroy_export(exp);
956 /* self export doesn't hold a reference to an obd, although it
957 * exists until freeing of the obd */
958 if (exp != obd->obd_self_export)
959 class_decref(obd, "export", exp);
961 OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
962 kfree_rcu(exp, exp_handle.h_rcu);
966 struct obd_export *class_export_get(struct obd_export *exp)
968 refcount_inc(&exp->exp_handle.h_ref);
969 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
970 refcount_read(&exp->exp_handle.h_ref));
973 EXPORT_SYMBOL(class_export_get);
975 void class_export_put(struct obd_export *exp)
977 LASSERT(exp != NULL);
978 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
979 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
980 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
981 refcount_read(&exp->exp_handle.h_ref) - 1);
983 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
984 struct obd_device *obd = exp->exp_obd;
986 CDEBUG(D_IOCTL, "final put %p/%s\n",
987 exp, exp->exp_client_uuid.uuid);
989 /* release nid stat refererence */
990 lprocfs_exp_cleanup(exp);
992 if (exp == obd->obd_self_export) {
993 /* self export should be destroyed without
994 * zombie thread as it doesn't hold a
995 * reference to obd and doesn't hold any
997 class_export_destroy(exp);
998 /* self export is destroyed, no class
999 * references exist and it is safe to free
1001 class_free_dev(obd);
1003 LASSERT(!list_empty(&exp->exp_obd_chain));
1004 obd_zombie_export_add(exp);
1009 EXPORT_SYMBOL(class_export_put);
1011 static void obd_zombie_exp_cull(struct work_struct *ws)
1013 struct obd_export *export;
1015 export = container_of(ws, struct obd_export, exp_zombie_work);
1016 class_export_destroy(export);
1019 /* Creates a new export, adds it to the hash table, and returns a
1020 * pointer to it. The refcount is 2: one for the hash reference, and
1021 * one for the pointer returned by this function. */
1022 struct obd_export *__class_new_export(struct obd_device *obd,
1023 struct obd_uuid *cluuid, bool is_self)
1025 struct obd_export *export;
1029 OBD_ALLOC_PTR(export);
1031 return ERR_PTR(-ENOMEM);
1033 export->exp_conn_cnt = 0;
1034 export->exp_lock_hash = NULL;
1035 export->exp_flock_hash = NULL;
1036 /* 2 = class_handle_hash + last */
1037 refcount_set(&export->exp_handle.h_ref, 2);
1038 atomic_set(&export->exp_rpc_count, 0);
1039 atomic_set(&export->exp_cb_count, 0);
1040 atomic_set(&export->exp_locks_count, 0);
1041 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1042 INIT_LIST_HEAD(&export->exp_locks_list);
1043 spin_lock_init(&export->exp_locks_list_guard);
1045 atomic_set(&export->exp_replay_count, 0);
1046 export->exp_obd = obd;
1047 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1048 spin_lock_init(&export->exp_uncommitted_replies_lock);
1049 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1050 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1051 INIT_HLIST_NODE(&export->exp_handle.h_link);
1052 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1053 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1054 class_handle_hash(&export->exp_handle, export_handle_owner);
1055 export->exp_last_request_time = ktime_get_real_seconds();
1056 spin_lock_init(&export->exp_lock);
1057 spin_lock_init(&export->exp_rpc_lock);
1058 INIT_HLIST_NODE(&export->exp_gen_hash);
1059 spin_lock_init(&export->exp_bl_list_lock);
1060 INIT_LIST_HEAD(&export->exp_bl_list);
1061 INIT_LIST_HEAD(&export->exp_stale_list);
1062 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1064 export->exp_sp_peer = LUSTRE_SP_ANY;
1065 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1066 export->exp_client_uuid = *cluuid;
1067 obd_init_export(export);
1069 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1071 spin_lock(&obd->obd_dev_lock);
1072 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1073 /* shouldn't happen, but might race */
1074 if (obd->obd_stopping)
1075 GOTO(exit_unlock, rc = -ENODEV);
1077 rc = obd_uuid_add(obd, export);
1079 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1080 obd->obd_name, cluuid->uuid, rc);
1081 GOTO(exit_unlock, rc = -EALREADY);
1086 class_incref(obd, "export", export);
1087 list_add_tail(&export->exp_obd_chain_timed,
1088 &obd->obd_exports_timed);
1089 list_add(&export->exp_obd_chain, &obd->obd_exports);
1090 obd->obd_num_exports++;
1092 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1093 INIT_LIST_HEAD(&export->exp_obd_chain);
1095 spin_unlock(&obd->obd_dev_lock);
1099 spin_unlock(&obd->obd_dev_lock);
1100 class_handle_unhash(&export->exp_handle);
1101 obd_destroy_export(export);
1102 OBD_FREE_PTR(export);
1106 struct obd_export *class_new_export(struct obd_device *obd,
1107 struct obd_uuid *uuid)
1109 return __class_new_export(obd, uuid, false);
1111 EXPORT_SYMBOL(class_new_export);
1113 struct obd_export *class_new_export_self(struct obd_device *obd,
1114 struct obd_uuid *uuid)
1116 return __class_new_export(obd, uuid, true);
1119 void class_unlink_export(struct obd_export *exp)
1121 class_handle_unhash(&exp->exp_handle);
1123 if (exp->exp_obd->obd_self_export == exp) {
1124 class_export_put(exp);
1128 spin_lock(&exp->exp_obd->obd_dev_lock);
1129 /* delete an uuid-export hashitem from hashtables */
1130 if (exp != exp->exp_obd->obd_self_export)
1131 obd_uuid_del(exp->exp_obd, exp);
1133 #ifdef HAVE_SERVER_SUPPORT
1134 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1135 struct tg_export_data *ted = &exp->exp_target_data;
1136 struct cfs_hash *hash;
1138 /* Because obd_gen_hash will not be released until
1139 * class_cleanup(), so hash should never be NULL here */
1140 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1141 LASSERT(hash != NULL);
1142 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1143 &exp->exp_gen_hash);
1144 cfs_hash_putref(hash);
1146 #endif /* HAVE_SERVER_SUPPORT */
1148 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1149 list_del_init(&exp->exp_obd_chain_timed);
1150 exp->exp_obd->obd_num_exports--;
1151 spin_unlock(&exp->exp_obd->obd_dev_lock);
1152 atomic_inc(&obd_stale_export_num);
1154 /* A reference is kept by obd_stale_exports list */
1155 obd_stale_export_put(exp);
1157 EXPORT_SYMBOL(class_unlink_export);
1159 /* Import management functions */
1160 static void obd_zombie_import_free(struct obd_import *imp)
1162 struct obd_import_conn *imp_conn;
1165 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1166 imp->imp_obd->obd_name);
1168 LASSERT(refcount_read(&imp->imp_refcount) == 0);
1170 ptlrpc_connection_put(imp->imp_connection);
1172 while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1173 struct obd_import_conn,
1174 oic_item)) != NULL) {
1175 list_del_init(&imp_conn->oic_item);
1176 ptlrpc_connection_put(imp_conn->oic_conn);
1177 OBD_FREE(imp_conn, sizeof(*imp_conn));
1180 LASSERT(imp->imp_sec == NULL);
1181 LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1182 imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1183 class_decref(imp->imp_obd, "import", imp);
1188 struct obd_import *class_import_get(struct obd_import *import)
1190 refcount_inc(&import->imp_refcount);
1191 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1192 refcount_read(&import->imp_refcount),
1193 import->imp_obd->obd_name);
1196 EXPORT_SYMBOL(class_import_get);
1198 void class_import_put(struct obd_import *imp)
1202 LASSERT(refcount_read(&imp->imp_refcount) > 0);
1204 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1205 refcount_read(&imp->imp_refcount) - 1,
1206 imp->imp_obd->obd_name);
1208 if (refcount_dec_and_test(&imp->imp_refcount)) {
1209 CDEBUG(D_INFO, "final put import %p\n", imp);
1210 obd_zombie_import_add(imp);
1215 EXPORT_SYMBOL(class_import_put);
1217 static void init_imp_at(struct imp_at *at) {
1219 at_init(&at->iat_net_latency, 0, 0);
1220 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1221 /* max service estimates are tracked on the server side, so
1222 don't use the AT history here, just use the last reported
1223 val. (But keep hist for proc histogram, worst_ever) */
1224 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1229 static void obd_zombie_imp_cull(struct work_struct *ws)
1231 struct obd_import *import;
1233 import = container_of(ws, struct obd_import, imp_zombie_work);
1234 obd_zombie_import_free(import);
1237 struct obd_import *class_new_import(struct obd_device *obd)
1239 struct obd_import *imp;
1240 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1242 OBD_ALLOC(imp, sizeof(*imp));
1246 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1247 INIT_LIST_HEAD(&imp->imp_replay_list);
1248 INIT_LIST_HEAD(&imp->imp_sending_list);
1249 INIT_LIST_HEAD(&imp->imp_delayed_list);
1250 INIT_LIST_HEAD(&imp->imp_committed_list);
1251 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1252 imp->imp_known_replied_xid = 0;
1253 imp->imp_replay_cursor = &imp->imp_committed_list;
1254 spin_lock_init(&imp->imp_lock);
1255 imp->imp_last_success_conn = 0;
1256 imp->imp_state = LUSTRE_IMP_NEW;
1257 imp->imp_obd = class_incref(obd, "import", imp);
1258 rwlock_init(&imp->imp_sec_lock);
1259 init_waitqueue_head(&imp->imp_recovery_waitq);
1260 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1262 if (curr_pid_ns && curr_pid_ns->child_reaper)
1263 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1265 imp->imp_sec_refpid = 1;
1267 refcount_set(&imp->imp_refcount, 2);
1268 atomic_set(&imp->imp_unregistering, 0);
1269 atomic_set(&imp->imp_reqs, 0);
1270 atomic_set(&imp->imp_inflight, 0);
1271 atomic_set(&imp->imp_replay_inflight, 0);
1272 init_waitqueue_head(&imp->imp_replay_waitq);
1273 atomic_set(&imp->imp_inval_count, 0);
1274 INIT_LIST_HEAD(&imp->imp_conn_list);
1275 init_imp_at(&imp->imp_at);
1277 /* the default magic is V2, will be used in connect RPC, and
1278 * then adjusted according to the flags in request/reply. */
1279 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1283 EXPORT_SYMBOL(class_new_import);
1285 void class_destroy_import(struct obd_import *import)
1287 LASSERT(import != NULL);
1288 LASSERT(import != LP_POISON);
1290 spin_lock(&import->imp_lock);
1291 import->imp_generation++;
1292 spin_unlock(&import->imp_lock);
1293 class_import_put(import);
1295 EXPORT_SYMBOL(class_destroy_import);
1297 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1299 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1301 spin_lock(&exp->exp_locks_list_guard);
1303 LASSERT(lock->l_exp_refs_nr >= 0);
1305 if (lock->l_exp_refs_target != NULL &&
1306 lock->l_exp_refs_target != exp) {
1307 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1308 exp, lock, lock->l_exp_refs_target);
1310 if ((lock->l_exp_refs_nr ++) == 0) {
1311 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1312 lock->l_exp_refs_target = exp;
1314 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1315 lock, exp, lock->l_exp_refs_nr);
1316 spin_unlock(&exp->exp_locks_list_guard);
1318 EXPORT_SYMBOL(__class_export_add_lock_ref);
1320 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1322 spin_lock(&exp->exp_locks_list_guard);
1323 LASSERT(lock->l_exp_refs_nr > 0);
1324 if (lock->l_exp_refs_target != exp) {
1325 LCONSOLE_WARN("lock %p, "
1326 "mismatching export pointers: %p, %p\n",
1327 lock, lock->l_exp_refs_target, exp);
1329 if (-- lock->l_exp_refs_nr == 0) {
1330 list_del_init(&lock->l_exp_refs_link);
1331 lock->l_exp_refs_target = NULL;
1333 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1334 lock, exp, lock->l_exp_refs_nr);
1335 spin_unlock(&exp->exp_locks_list_guard);
1337 EXPORT_SYMBOL(__class_export_del_lock_ref);
1340 /* A connection defines an export context in which preallocation can
1341 be managed. This releases the export pointer reference, and returns
1342 the export handle, so the export refcount is 1 when this function
1344 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1345 struct obd_uuid *cluuid)
1347 struct obd_export *export;
1348 LASSERT(conn != NULL);
1349 LASSERT(obd != NULL);
1350 LASSERT(cluuid != NULL);
1353 export = class_new_export(obd, cluuid);
1355 RETURN(PTR_ERR(export));
1357 conn->cookie = export->exp_handle.h_cookie;
1358 class_export_put(export);
1360 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1361 cluuid->uuid, conn->cookie);
1364 EXPORT_SYMBOL(class_connect);
1366 /* if export is involved in recovery then clean up related things */
1367 static void class_export_recovery_cleanup(struct obd_export *exp)
1369 struct obd_device *obd = exp->exp_obd;
1371 spin_lock(&obd->obd_recovery_task_lock);
1372 if (obd->obd_recovering) {
1373 if (exp->exp_in_recovery) {
1374 spin_lock(&exp->exp_lock);
1375 exp->exp_in_recovery = 0;
1376 spin_unlock(&exp->exp_lock);
1377 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1378 atomic_dec(&obd->obd_connected_clients);
1381 /* if called during recovery then should update
1382 * obd_stale_clients counter,
1383 * lightweight exports are not counted */
1384 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1385 exp->exp_obd->obd_stale_clients++;
1387 spin_unlock(&obd->obd_recovery_task_lock);
1389 spin_lock(&exp->exp_lock);
1390 /** Cleanup req replay fields */
1391 if (exp->exp_req_replay_needed) {
1392 exp->exp_req_replay_needed = 0;
1394 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1395 atomic_dec(&obd->obd_req_replay_clients);
1398 /** Cleanup lock replay data */
1399 if (exp->exp_lock_replay_needed) {
1400 exp->exp_lock_replay_needed = 0;
1402 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1403 atomic_dec(&obd->obd_lock_replay_clients);
1405 spin_unlock(&exp->exp_lock);
1408 /* This function removes 1-3 references from the export:
1409 * 1 - for export pointer passed
1410 * and if disconnect really need
1411 * 2 - removing from hash
1412 * 3 - in client_unlink_export
1413 * The export pointer passed to this function can destroyed */
1414 int class_disconnect(struct obd_export *export)
1416 int already_disconnected;
1419 if (export == NULL) {
1420 CWARN("attempting to free NULL export %p\n", export);
1424 spin_lock(&export->exp_lock);
1425 already_disconnected = export->exp_disconnected;
1426 export->exp_disconnected = 1;
1427 #ifdef HAVE_SERVER_SUPPORT
1428 /* We hold references of export for uuid hash
1429 * and nid_hash and export link at least. So
1430 * it is safe to call rh*table_remove_fast in
1433 obd_nid_del(export->exp_obd, export);
1434 #endif /* HAVE_SERVER_SUPPORT */
1435 spin_unlock(&export->exp_lock);
1437 /* class_cleanup(), abort_recovery(), and class_fail_export()
1438 * all end up in here, and if any of them race we shouldn't
1439 * call extra class_export_puts(). */
1440 if (already_disconnected)
1441 GOTO(no_disconn, already_disconnected);
1443 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1444 export->exp_handle.h_cookie);
1446 class_export_recovery_cleanup(export);
1447 class_unlink_export(export);
1449 class_export_put(export);
1452 EXPORT_SYMBOL(class_disconnect);
1454 /* Return non-zero for a fully connected export */
1455 int class_connected_export(struct obd_export *exp)
1460 spin_lock(&exp->exp_lock);
1461 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1462 spin_unlock(&exp->exp_lock);
1466 EXPORT_SYMBOL(class_connected_export);
1468 static void class_disconnect_export_list(struct list_head *list,
1469 enum obd_option flags)
1472 struct obd_export *exp;
1475 /* It's possible that an export may disconnect itself, but
1476 * nothing else will be added to this list.
1478 while ((exp = list_first_entry_or_null(list, struct obd_export,
1479 exp_obd_chain)) != NULL) {
1480 /* need for safe call CDEBUG after obd_disconnect */
1481 class_export_get(exp);
1483 spin_lock(&exp->exp_lock);
1484 exp->exp_flags = flags;
1485 spin_unlock(&exp->exp_lock);
1487 if (obd_uuid_equals(&exp->exp_client_uuid,
1488 &exp->exp_obd->obd_uuid)) {
1490 "exp %p export uuid == obd uuid, don't discon\n",
1492 /* Need to delete this now so we don't end up pointing
1493 * to work_list later when this export is cleaned up. */
1494 list_del_init(&exp->exp_obd_chain);
1495 class_export_put(exp);
1499 class_export_get(exp);
1500 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1501 "last request at %lld\n",
1502 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1503 exp, exp->exp_last_request_time);
1504 /* release one export reference anyway */
1505 rc = obd_disconnect(exp);
1507 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1508 obd_export_nid2str(exp), exp, rc);
1509 class_export_put(exp);
1514 void class_disconnect_exports(struct obd_device *obd)
1516 LIST_HEAD(work_list);
1519 /* Move all of the exports from obd_exports to a work list, en masse. */
1520 spin_lock(&obd->obd_dev_lock);
1521 list_splice_init(&obd->obd_exports, &work_list);
1522 list_splice_init(&obd->obd_delayed_exports, &work_list);
1523 spin_unlock(&obd->obd_dev_lock);
1525 if (!list_empty(&work_list)) {
1526 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1527 "disconnecting them\n", obd->obd_minor, obd);
1528 class_disconnect_export_list(&work_list,
1529 exp_flags_from_obd(obd));
1531 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1532 obd->obd_minor, obd);
1535 EXPORT_SYMBOL(class_disconnect_exports);
1537 /* Remove exports that have not completed recovery.
1539 void class_disconnect_stale_exports(struct obd_device *obd,
1540 int (*test_export)(struct obd_export *))
1542 LIST_HEAD(work_list);
1543 struct obd_export *exp, *n;
1547 spin_lock(&obd->obd_dev_lock);
1548 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1550 /* don't count self-export as client */
1551 if (obd_uuid_equals(&exp->exp_client_uuid,
1552 &exp->exp_obd->obd_uuid))
1555 /* don't evict clients which have no slot in last_rcvd
1556 * (e.g. lightweight connection) */
1557 if (exp->exp_target_data.ted_lr_idx == -1)
1560 spin_lock(&exp->exp_lock);
1561 if (exp->exp_failed || test_export(exp)) {
1562 spin_unlock(&exp->exp_lock);
1565 exp->exp_failed = 1;
1566 spin_unlock(&exp->exp_lock);
1568 list_move(&exp->exp_obd_chain, &work_list);
1570 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1571 obd->obd_name, exp->exp_client_uuid.uuid,
1572 obd_export_nid2str(exp));
1573 print_export_data(exp, "EVICTING", 0, D_HA);
1575 spin_unlock(&obd->obd_dev_lock);
1578 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1579 obd->obd_name, evicted);
1581 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1582 OBD_OPT_ABORT_RECOV);
1585 EXPORT_SYMBOL(class_disconnect_stale_exports);
1587 void class_fail_export(struct obd_export *exp)
1589 int rc, already_failed;
1591 spin_lock(&exp->exp_lock);
1592 already_failed = exp->exp_failed;
1593 exp->exp_failed = 1;
1594 spin_unlock(&exp->exp_lock);
1596 if (already_failed) {
1597 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1598 exp, exp->exp_client_uuid.uuid);
1602 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1603 exp, exp->exp_client_uuid.uuid);
1605 if (obd_dump_on_timeout)
1606 libcfs_debug_dumplog();
1608 /* need for safe call CDEBUG after obd_disconnect */
1609 class_export_get(exp);
1611 /* Most callers into obd_disconnect are removing their own reference
1612 * (request, for example) in addition to the one from the hash table.
1613 * We don't have such a reference here, so make one. */
1614 class_export_get(exp);
1615 rc = obd_disconnect(exp);
1617 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1619 CDEBUG(D_HA, "disconnected export %p/%s\n",
1620 exp, exp->exp_client_uuid.uuid);
1621 class_export_put(exp);
1623 EXPORT_SYMBOL(class_fail_export);
1625 #ifdef HAVE_SERVER_SUPPORT
1627 static int take_first(struct obd_export *exp, void *data)
1629 struct obd_export **expp = data;
1632 /* already have one */
1634 if (exp->exp_failed)
1635 /* Don't want this one */
1637 if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1638 /* Cannot get a ref on this one */
1644 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1646 struct lnet_nid nid_key;
1647 struct obd_export *doomed_exp;
1648 int exports_evicted = 0;
1650 libcfs_strnid(&nid_key, nid);
1652 spin_lock(&obd->obd_dev_lock);
1653 /* umount has run already, so evict thread should leave
1654 * its task to umount thread now */
1655 if (obd->obd_stopping) {
1656 spin_unlock(&obd->obd_dev_lock);
1657 return exports_evicted;
1659 spin_unlock(&obd->obd_dev_lock);
1662 while (obd_nid_export_for_each(obd, &nid_key,
1663 take_first, &doomed_exp) > 0) {
1665 LASSERTF(doomed_exp != obd->obd_self_export,
1666 "self-export is hashed by NID?\n");
1668 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1670 obd_uuid2str(&doomed_exp->exp_client_uuid),
1671 obd_export_nid2str(doomed_exp));
1673 class_fail_export(doomed_exp);
1674 class_export_put(doomed_exp);
1679 if (!exports_evicted)
1681 "%s: can't disconnect NID '%s': no exports found\n",
1682 obd->obd_name, nid);
1683 return exports_evicted;
1685 EXPORT_SYMBOL(obd_export_evict_by_nid);
1687 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1689 struct obd_export *doomed_exp = NULL;
1690 struct obd_uuid doomed_uuid;
1691 int exports_evicted = 0;
1693 spin_lock(&obd->obd_dev_lock);
1694 if (obd->obd_stopping) {
1695 spin_unlock(&obd->obd_dev_lock);
1696 return exports_evicted;
1698 spin_unlock(&obd->obd_dev_lock);
1700 obd_str2uuid(&doomed_uuid, uuid);
1701 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1702 CERROR("%s: can't evict myself\n", obd->obd_name);
1703 return exports_evicted;
1706 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1707 if (doomed_exp == NULL) {
1708 CERROR("%s: can't disconnect %s: no exports found\n",
1709 obd->obd_name, uuid);
1711 CWARN("%s: evicting %s at adminstrative request\n",
1712 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1713 class_fail_export(doomed_exp);
1714 class_export_put(doomed_exp);
1715 obd_uuid_del(obd, doomed_exp);
1719 return exports_evicted;
1721 #endif /* HAVE_SERVER_SUPPORT */
1723 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1724 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1725 EXPORT_SYMBOL(class_export_dump_hook);
1728 static void print_export_data(struct obd_export *exp, const char *status,
1729 int locks, int debug_level)
1731 struct ptlrpc_reply_state *rs;
1732 struct ptlrpc_reply_state *first_reply = NULL;
1735 spin_lock(&exp->exp_lock);
1736 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1742 spin_unlock(&exp->exp_lock);
1744 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1745 "%p %s %llu stale:%d\n",
1746 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1747 obd_export_nid2str(exp),
1748 refcount_read(&exp->exp_handle.h_ref),
1749 atomic_read(&exp->exp_rpc_count),
1750 atomic_read(&exp->exp_cb_count),
1751 atomic_read(&exp->exp_locks_count),
1752 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1753 nreplies, first_reply, nreplies > 3 ? "..." : "",
1754 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1755 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1756 if (locks && class_export_dump_hook != NULL)
1757 class_export_dump_hook(exp);
1761 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1763 struct obd_export *exp;
1765 spin_lock(&obd->obd_dev_lock);
1766 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1767 print_export_data(exp, "ACTIVE", locks, debug_level);
1768 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1769 print_export_data(exp, "UNLINKED", locks, debug_level);
1770 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1771 print_export_data(exp, "DELAYED", locks, debug_level);
1772 spin_unlock(&obd->obd_dev_lock);
1775 void obd_exports_barrier(struct obd_device *obd)
1778 LASSERT(list_empty(&obd->obd_exports));
1779 spin_lock(&obd->obd_dev_lock);
1780 while (!list_empty(&obd->obd_unlinked_exports)) {
1781 spin_unlock(&obd->obd_dev_lock);
1782 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1783 if (waited > 5 && is_power_of_2(waited)) {
1784 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1785 "more than %d seconds. "
1786 "The obd refcount = %d. Is it stuck?\n",
1787 obd->obd_name, waited,
1788 atomic_read(&obd->obd_refcount));
1789 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1792 spin_lock(&obd->obd_dev_lock);
1794 spin_unlock(&obd->obd_dev_lock);
1796 EXPORT_SYMBOL(obd_exports_barrier);
1799 * Add export to the obd_zombe thread and notify it.
1801 static void obd_zombie_export_add(struct obd_export *exp) {
1802 atomic_dec(&obd_stale_export_num);
1803 spin_lock(&exp->exp_obd->obd_dev_lock);
1804 LASSERT(!list_empty(&exp->exp_obd_chain));
1805 list_del_init(&exp->exp_obd_chain);
1806 spin_unlock(&exp->exp_obd->obd_dev_lock);
1808 queue_work(zombie_wq, &exp->exp_zombie_work);
1812 * Add import to the obd_zombe thread and notify it.
1814 static void obd_zombie_import_add(struct obd_import *imp) {
1815 LASSERT(imp->imp_sec == NULL);
1817 queue_work(zombie_wq, &imp->imp_zombie_work);
1821 * wait when obd_zombie import/export queues become empty
1823 void obd_zombie_barrier(void)
1825 flush_workqueue(zombie_wq);
1827 EXPORT_SYMBOL(obd_zombie_barrier);
1830 struct obd_export *obd_stale_export_get(void)
1832 struct obd_export *exp = NULL;
1835 spin_lock(&obd_stale_export_lock);
1836 if (!list_empty(&obd_stale_exports)) {
1837 exp = list_first_entry(&obd_stale_exports,
1838 struct obd_export, exp_stale_list);
1839 list_del_init(&exp->exp_stale_list);
1841 spin_unlock(&obd_stale_export_lock);
1844 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1845 atomic_read(&obd_stale_export_num));
1849 EXPORT_SYMBOL(obd_stale_export_get);
1851 void obd_stale_export_put(struct obd_export *exp)
1855 LASSERT(list_empty(&exp->exp_stale_list));
1856 if (exp->exp_lock_hash &&
1857 atomic_read(&exp->exp_lock_hash->hs_count)) {
1858 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1859 atomic_read(&obd_stale_export_num));
1861 spin_lock_bh(&exp->exp_bl_list_lock);
1862 spin_lock(&obd_stale_export_lock);
1863 /* Add to the tail if there is no blocked locks,
1864 * to the head otherwise. */
1865 if (list_empty(&exp->exp_bl_list))
1866 list_add_tail(&exp->exp_stale_list,
1867 &obd_stale_exports);
1869 list_add(&exp->exp_stale_list,
1870 &obd_stale_exports);
1872 spin_unlock(&obd_stale_export_lock);
1873 spin_unlock_bh(&exp->exp_bl_list_lock);
1875 class_export_put(exp);
1879 EXPORT_SYMBOL(obd_stale_export_put);
1882 * Adjust the position of the export in the stale list,
1883 * i.e. move to the head of the list if is needed.
1885 void obd_stale_export_adjust(struct obd_export *exp)
1887 LASSERT(exp != NULL);
1888 spin_lock_bh(&exp->exp_bl_list_lock);
1889 spin_lock(&obd_stale_export_lock);
1891 if (!list_empty(&exp->exp_stale_list) &&
1892 !list_empty(&exp->exp_bl_list))
1893 list_move(&exp->exp_stale_list, &obd_stale_exports);
1895 spin_unlock(&obd_stale_export_lock);
1896 spin_unlock_bh(&exp->exp_bl_list_lock);
1898 EXPORT_SYMBOL(obd_stale_export_adjust);
1901 * start destroy zombie import/export thread
1903 int obd_zombie_impexp_init(void)
1905 zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1907 cfs_cpt_number(cfs_cpt_tab));
1909 return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1913 * stop destroy zombie import/export thread
1915 void obd_zombie_impexp_stop(void)
1917 destroy_workqueue(zombie_wq);
1918 LASSERT(list_empty(&obd_stale_exports));
1921 /***** Kernel-userspace comm helpers *******/
1923 /* Get length of entire message, including header */
1924 int kuc_len(int payload_len)
1926 return sizeof(struct kuc_hdr) + payload_len;
1928 EXPORT_SYMBOL(kuc_len);
1930 /* Get a pointer to kuc header, given a ptr to the payload
1931 * @param p Pointer to payload area
1932 * @returns Pointer to kuc header
1934 struct kuc_hdr * kuc_ptr(void *p)
1936 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1937 LASSERT(lh->kuc_magic == KUC_MAGIC);
1940 EXPORT_SYMBOL(kuc_ptr);
1942 /* Alloc space for a message, and fill in header
1943 * @return Pointer to payload area
1945 void *kuc_alloc(int payload_len, int transport, int type)
1948 int len = kuc_len(payload_len);
1952 return ERR_PTR(-ENOMEM);
1954 lh->kuc_magic = KUC_MAGIC;
1955 lh->kuc_transport = transport;
1956 lh->kuc_msgtype = type;
1957 lh->kuc_msglen = len;
1959 return (void *)(lh + 1);
1961 EXPORT_SYMBOL(kuc_alloc);
1963 /* Takes pointer to payload area */
1964 void kuc_free(void *p, int payload_len)
1966 struct kuc_hdr *lh = kuc_ptr(p);
1967 OBD_FREE(lh, kuc_len(payload_len));
1969 EXPORT_SYMBOL(kuc_free);
1971 struct obd_request_slot_waiter {
1972 struct list_head orsw_entry;
1973 wait_queue_head_t orsw_waitq;
1977 static bool obd_request_slot_avail(struct client_obd *cli,
1978 struct obd_request_slot_waiter *orsw)
1982 spin_lock(&cli->cl_loi_list_lock);
1983 avail = !!list_empty(&orsw->orsw_entry);
1984 spin_unlock(&cli->cl_loi_list_lock);
1990 * For network flow control, the RPC sponsor needs to acquire a credit
1991 * before sending the RPC. The credits count for a connection is defined
1992 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1993 * the subsequent RPC sponsors need to wait until others released their
1994 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1996 int obd_get_request_slot(struct client_obd *cli)
1998 struct obd_request_slot_waiter orsw;
2001 spin_lock(&cli->cl_loi_list_lock);
2002 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2003 cli->cl_rpcs_in_flight++;
2004 spin_unlock(&cli->cl_loi_list_lock);
2008 init_waitqueue_head(&orsw.orsw_waitq);
2009 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2010 orsw.orsw_signaled = false;
2011 spin_unlock(&cli->cl_loi_list_lock);
2013 rc = l_wait_event_abortable(orsw.orsw_waitq,
2014 obd_request_slot_avail(cli, &orsw) ||
2015 orsw.orsw_signaled);
2017 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2018 * freed but other (such as obd_put_request_slot) is using it. */
2019 spin_lock(&cli->cl_loi_list_lock);
2021 if (!orsw.orsw_signaled) {
2022 if (list_empty(&orsw.orsw_entry))
2023 cli->cl_rpcs_in_flight--;
2025 list_del(&orsw.orsw_entry);
2030 if (orsw.orsw_signaled) {
2031 LASSERT(list_empty(&orsw.orsw_entry));
2035 spin_unlock(&cli->cl_loi_list_lock);
2039 EXPORT_SYMBOL(obd_get_request_slot);
2041 void obd_put_request_slot(struct client_obd *cli)
2043 struct obd_request_slot_waiter *orsw;
2045 spin_lock(&cli->cl_loi_list_lock);
2046 cli->cl_rpcs_in_flight--;
2048 /* If there is free slot, wakeup the first waiter. */
2049 if (!list_empty(&cli->cl_flight_waiters) &&
2050 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2051 orsw = list_first_entry(&cli->cl_flight_waiters,
2052 struct obd_request_slot_waiter,
2054 list_del_init(&orsw->orsw_entry);
2055 cli->cl_rpcs_in_flight++;
2056 wake_up(&orsw->orsw_waitq);
2058 spin_unlock(&cli->cl_loi_list_lock);
2060 EXPORT_SYMBOL(obd_put_request_slot);
2062 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2064 return cli->cl_max_rpcs_in_flight;
2066 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2068 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2070 struct obd_request_slot_waiter *orsw;
2076 if (max > OBD_MAX_RIF_MAX || max < 1)
2079 CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2080 cli->cl_import->imp_obd->obd_name, max,
2081 cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2083 if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2084 LUSTRE_MDC_NAME) == 0) {
2085 /* adjust max_mod_rpcs_in_flight to ensure it is always
2086 * strictly lower that max_rpcs_in_flight */
2088 CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2089 cli->cl_import->imp_obd->obd_name);
2092 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2093 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2099 spin_lock(&cli->cl_loi_list_lock);
2100 old = cli->cl_max_rpcs_in_flight;
2101 cli->cl_max_rpcs_in_flight = max;
2102 client_adjust_max_dirty(cli);
2106 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2107 for (i = 0; i < diff; i++) {
2108 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2109 struct obd_request_slot_waiter,
2114 list_del_init(&orsw->orsw_entry);
2115 cli->cl_rpcs_in_flight++;
2116 wake_up(&orsw->orsw_waitq);
2118 spin_unlock(&cli->cl_loi_list_lock);
2122 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2124 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2126 return cli->cl_max_mod_rpcs_in_flight;
2128 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2130 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2132 struct obd_connect_data *ocd;
2136 if (max > OBD_MAX_RIF_MAX || max < 1)
2139 ocd = &cli->cl_import->imp_connect_data;
2140 CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2141 cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2142 ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2144 if (max == OBD_MAX_RIF_MAX)
2145 max = OBD_MAX_RIF_MAX - 1;
2147 /* Cannot exceed or equal max_rpcs_in_flight. If we are asked to
2148 * increase this value, also bump up max_rpcs_in_flight to match.
2150 if (max >= cli->cl_max_rpcs_in_flight) {
2152 "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2153 cli->cl_import->imp_obd->obd_name, max + 1, max);
2154 obd_set_max_rpcs_in_flight(cli, max + 1);
2157 /* cannot exceed max modify RPCs in flight supported by the server,
2158 * but verify ocd_connect_flags is at least initialized first. If
2159 * not, allow it and fix value later in ptlrpc_connect_set_flags().
2161 if (!ocd->ocd_connect_flags) {
2162 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2163 } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2164 maxmodrpcs = ocd->ocd_maxmodrpcs;
2165 if (maxmodrpcs == 0) { /* connection not finished yet */
2166 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2168 "%s: partial connect, assume maxmodrpcs=%hu\n",
2169 cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2174 if (max > maxmodrpcs) {
2175 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than ocd_maxmodrpcs=%hu returned by the server at connection\n",
2176 cli->cl_import->imp_obd->obd_name,
2181 spin_lock(&cli->cl_mod_rpcs_lock);
2183 prev = cli->cl_max_mod_rpcs_in_flight;
2184 cli->cl_max_mod_rpcs_in_flight = max;
2186 /* wakeup waiters if limit has been increased */
2187 if (cli->cl_max_mod_rpcs_in_flight > prev)
2188 wake_up(&cli->cl_mod_rpcs_waitq);
2190 spin_unlock(&cli->cl_mod_rpcs_lock);
2194 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2196 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2197 struct seq_file *seq)
2199 unsigned long mod_tot = 0, mod_cum;
2202 spin_lock(&cli->cl_mod_rpcs_lock);
2203 lprocfs_stats_header(seq, ktime_get(), cli->cl_mod_rpcs_init, 25,
2205 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2206 cli->cl_mod_rpcs_in_flight);
2208 seq_printf(seq, "\n\t\t\tmodify\n");
2209 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2211 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2214 for (i = 0; i < OBD_HIST_MAX; i++) {
2215 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2218 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2219 i, mod, pct(mod, mod_tot),
2220 pct(mod_cum, mod_tot));
2221 if (mod_cum == mod_tot)
2225 spin_unlock(&cli->cl_mod_rpcs_lock);
2229 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2231 /* The number of modify RPCs sent in parallel is limited
2232 * because the server has a finite number of slots per client to
2233 * store request result and ensure reply reconstruction when needed.
2234 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2235 * that takes into account server limit and cl_max_rpcs_in_flight
2237 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2238 * one close request is allowed above the maximum.
2240 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2245 /* A slot is available if
2246 * - number of modify RPCs in flight is less than the max
2247 * - it's a close RPC and no other close request is in flight
2249 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2250 (close_req && cli->cl_close_rpcs_in_flight == 0);
2255 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2260 spin_lock(&cli->cl_mod_rpcs_lock);
2261 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2262 spin_unlock(&cli->cl_mod_rpcs_lock);
2267 /* Get a modify RPC slot from the obd client @cli according
2268 * to the kind of operation @opc that is going to be sent
2269 * and the intent @it of the operation if it applies.
2270 * If the maximum number of modify RPCs in flight is reached
2271 * the thread is put to sleep.
2272 * Returns the tag to be set in the request message. Tag 0
2273 * is reserved for non-modifying requests.
2275 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2277 bool close_req = false;
2280 if (opc == MDS_CLOSE)
2284 spin_lock(&cli->cl_mod_rpcs_lock);
2285 max = cli->cl_max_mod_rpcs_in_flight;
2286 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2287 /* there is a slot available */
2288 cli->cl_mod_rpcs_in_flight++;
2290 cli->cl_close_rpcs_in_flight++;
2291 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2292 cli->cl_mod_rpcs_in_flight);
2293 /* find a free tag */
2294 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2296 LASSERT(i < OBD_MAX_RIF_MAX);
2297 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2298 spin_unlock(&cli->cl_mod_rpcs_lock);
2299 /* tag 0 is reserved for non-modify RPCs */
2302 "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2303 cli->cl_import->imp_obd->obd_name,
2308 spin_unlock(&cli->cl_mod_rpcs_lock);
2310 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2311 "opc %u, max %hu\n",
2312 cli->cl_import->imp_obd->obd_name, opc, max);
2314 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2315 obd_mod_rpc_slot_avail(cli,
2319 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2321 /* Put a modify RPC slot from the obd client @cli according
2322 * to the kind of operation @opc that has been sent.
2324 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2326 bool close_req = false;
2331 if (opc == MDS_CLOSE)
2334 spin_lock(&cli->cl_mod_rpcs_lock);
2335 cli->cl_mod_rpcs_in_flight--;
2337 cli->cl_close_rpcs_in_flight--;
2338 /* release the tag in the bitmap */
2339 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2340 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2341 spin_unlock(&cli->cl_mod_rpcs_lock);
2342 /* LU-14741 - to prevent close RPCs stuck behind normal ones */
2344 wake_up_all(&cli->cl_mod_rpcs_waitq);
2346 wake_up(&cli->cl_mod_rpcs_waitq);
2348 EXPORT_SYMBOL(obd_put_mod_rpc_slot);