4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59 const char *status, int locks, int debug_level);
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
66 * support functions: we could use inter-module communication, but this
67 * is more portable to other OS's
69 static struct obd_device *obd_device_alloc(void)
71 struct obd_device *obd;
73 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
75 obd->obd_magic = OBD_DEVICE_MAGIC;
80 static void obd_device_free(struct obd_device *obd)
83 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
84 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
85 if (obd->obd_namespace != NULL) {
86 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
87 obd, obd->obd_namespace, obd->obd_force);
90 lu_ref_fini(&obd->obd_reference);
91 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 struct obd_type *class_search_type(const char *name)
96 struct kobject *kobj = kset_find_obj(lustre_kset, name);
98 if (kobj && kobj->ktype == &class_ktype)
99 return container_of(kobj, struct obd_type, typ_kobj);
104 EXPORT_SYMBOL(class_search_type);
106 struct obd_type *class_get_type(const char *name)
108 struct obd_type *type;
110 type = class_search_type(name);
111 #ifdef HAVE_MODULE_LOADING_SUPPORT
113 const char *modname = name;
115 #ifdef HAVE_SERVER_SUPPORT
116 if (strcmp(modname, "obdfilter") == 0)
119 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
120 modname = LUSTRE_OSP_NAME;
122 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
123 modname = LUSTRE_MDT_NAME;
124 #endif /* HAVE_SERVER_SUPPORT */
126 if (!request_module("%s", modname)) {
127 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
128 type = class_search_type(name);
130 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
136 if (try_module_get(type->typ_dt_ops->o_owner)) {
137 atomic_inc(&type->typ_refcnt);
138 /* class_search_type() returned a counted reference,
139 * but we don't need that count any more as
140 * we have one through typ_refcnt.
142 kobject_put(&type->typ_kobj);
144 kobject_put(&type->typ_kobj);
151 void class_put_type(struct obd_type *type)
154 module_put(type->typ_dt_ops->o_owner);
155 atomic_dec(&type->typ_refcnt);
158 static void class_sysfs_release(struct kobject *kobj)
160 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
162 debugfs_remove_recursive(type->typ_debugfs_entry);
163 type->typ_debugfs_entry = NULL;
166 lu_device_type_fini(type->typ_lu);
168 #ifdef CONFIG_PROC_FS
169 if (type->typ_name && type->typ_procroot)
170 remove_proc_subtree(type->typ_name, proc_lustre_root);
172 OBD_FREE(type, sizeof(*type));
175 static struct kobj_type class_ktype = {
176 .sysfs_ops = &lustre_sysfs_ops,
177 .release = class_sysfs_release,
180 #ifdef HAVE_SERVER_SUPPORT
181 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
183 struct dentry *symlink;
184 struct obd_type *type;
187 type = class_search_type(name);
189 kobject_put(&type->typ_kobj);
190 return ERR_PTR(-EEXIST);
193 OBD_ALLOC(type, sizeof(*type));
195 return ERR_PTR(-ENOMEM);
197 type->typ_kobj.kset = lustre_kset;
198 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
199 &lustre_kset->kobj, "%s", name);
203 symlink = debugfs_create_dir(name, debugfs_lustre_root);
204 type->typ_debugfs_entry = symlink;
205 type->typ_sym_filter = true;
208 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
210 if (IS_ERR(type->typ_procroot)) {
211 CERROR("%s: can't create compat proc entry: %d\n",
212 name, (int)PTR_ERR(type->typ_procroot));
213 type->typ_procroot = NULL;
219 EXPORT_SYMBOL(class_add_symlinks);
220 #endif /* HAVE_SERVER_SUPPORT */
222 #define CLASS_MAX_NAME 1024
224 int class_register_type(const struct obd_ops *dt_ops,
225 const struct md_ops *md_ops,
227 const char *name, struct lu_device_type *ldt)
229 struct obd_type *type;
234 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
236 type = class_search_type(name);
238 #ifdef HAVE_SERVER_SUPPORT
239 if (type->typ_sym_filter)
241 #endif /* HAVE_SERVER_SUPPORT */
242 kobject_put(&type->typ_kobj);
243 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
247 OBD_ALLOC(type, sizeof(*type));
251 type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
252 type->typ_kobj.kset = lustre_kset;
253 kobject_init(&type->typ_kobj, &class_ktype);
254 #ifdef HAVE_SERVER_SUPPORT
256 #endif /* HAVE_SERVER_SUPPORT */
258 type->typ_dt_ops = dt_ops;
259 type->typ_md_ops = md_ops;
261 #ifdef HAVE_SERVER_SUPPORT
262 if (type->typ_sym_filter) {
263 type->typ_sym_filter = false;
264 kobject_put(&type->typ_kobj);
268 #ifdef CONFIG_PROC_FS
269 if (enable_proc && !type->typ_procroot) {
270 type->typ_procroot = lprocfs_register(name,
273 if (IS_ERR(type->typ_procroot)) {
274 rc = PTR_ERR(type->typ_procroot);
275 type->typ_procroot = NULL;
280 type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
282 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
285 #ifdef HAVE_SERVER_SUPPORT
289 rc = lu_device_type_init(ldt);
290 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
291 wake_up_var(&type->typ_lu);
299 kobject_put(&type->typ_kobj);
303 EXPORT_SYMBOL(class_register_type);
305 int class_unregister_type(const char *name)
307 struct obd_type *type = class_search_type(name);
312 CERROR("unknown obd type\n");
316 if (atomic_read(&type->typ_refcnt)) {
317 CERROR("type %s has refcount (%d)\n", name,
318 atomic_read(&type->typ_refcnt));
319 /* This is a bad situation, let's make the best of it */
320 /* Remove ops, but leave the name for debugging */
321 type->typ_dt_ops = NULL;
322 type->typ_md_ops = NULL;
323 GOTO(out_put, rc = -EBUSY);
326 /* Put the final ref */
327 kobject_put(&type->typ_kobj);
329 /* Put the ref returned by class_search_type() */
330 kobject_put(&type->typ_kobj);
333 } /* class_unregister_type */
334 EXPORT_SYMBOL(class_unregister_type);
337 * Create a new obd device.
339 * Allocate the new obd_device and initialize it.
341 * \param[in] type_name obd device type string.
342 * \param[in] name obd device name.
343 * \param[in] uuid obd device UUID
345 * \retval newdev pointer to created obd_device
346 * \retval ERR_PTR(errno) on error
348 struct obd_device *class_newdev(const char *type_name, const char *name,
351 struct obd_device *newdev;
352 struct obd_type *type = NULL;
355 if (strlen(name) >= MAX_OBD_NAME) {
356 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
357 RETURN(ERR_PTR(-EINVAL));
360 type = class_get_type(type_name);
362 CERROR("OBD: unknown type: %s\n", type_name);
363 RETURN(ERR_PTR(-ENODEV));
366 newdev = obd_device_alloc();
367 if (newdev == NULL) {
368 class_put_type(type);
369 RETURN(ERR_PTR(-ENOMEM));
371 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
372 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
373 newdev->obd_type = type;
374 newdev->obd_minor = -1;
376 rwlock_init(&newdev->obd_pool_lock);
377 newdev->obd_pool_limit = 0;
378 newdev->obd_pool_slv = 0;
380 INIT_LIST_HEAD(&newdev->obd_exports);
381 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
382 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
383 INIT_LIST_HEAD(&newdev->obd_exports_timed);
384 INIT_LIST_HEAD(&newdev->obd_nid_stats);
385 spin_lock_init(&newdev->obd_nid_lock);
386 spin_lock_init(&newdev->obd_dev_lock);
387 mutex_init(&newdev->obd_dev_mutex);
388 spin_lock_init(&newdev->obd_osfs_lock);
389 /* newdev->obd_osfs_age must be set to a value in the distant
390 * past to guarantee a fresh statfs is fetched on mount. */
391 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
393 /* XXX belongs in setup not attach */
394 init_rwsem(&newdev->obd_observer_link_sem);
396 spin_lock_init(&newdev->obd_recovery_task_lock);
397 init_waitqueue_head(&newdev->obd_next_transno_waitq);
398 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
399 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
400 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
401 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
402 INIT_LIST_HEAD(&newdev->obd_evict_list);
403 INIT_LIST_HEAD(&newdev->obd_lwp_list);
405 llog_group_init(&newdev->obd_olg);
406 /* Detach drops this */
407 atomic_set(&newdev->obd_refcount, 1);
408 lu_ref_init(&newdev->obd_reference);
409 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
411 newdev->obd_conn_inprogress = 0;
413 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
415 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
416 newdev->obd_name, newdev);
424 * \param[in] obd obd_device to be freed
428 void class_free_dev(struct obd_device *obd)
430 struct obd_type *obd_type = obd->obd_type;
432 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
433 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
434 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
435 "obd %p != obd_devs[%d] %p\n",
436 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
437 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
438 "obd_refcount should be 0, not %d\n",
439 atomic_read(&obd->obd_refcount));
440 LASSERT(obd_type != NULL);
442 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
443 obd->obd_name, obd->obd_type->typ_name);
445 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
446 obd->obd_name, obd->obd_uuid.uuid);
447 if (obd->obd_stopping) {
450 /* If we're not stopping, we were never set up */
451 err = obd_cleanup(obd);
453 CERROR("Cleanup %s returned %d\n",
457 obd_device_free(obd);
459 class_put_type(obd_type);
463 * Unregister obd device.
465 * Free slot in obd_dev[] used by \a obd.
467 * \param[in] new_obd obd_device to be unregistered
471 void class_unregister_device(struct obd_device *obd)
473 write_lock(&obd_dev_lock);
474 if (obd->obd_minor >= 0) {
475 LASSERT(obd_devs[obd->obd_minor] == obd);
476 obd_devs[obd->obd_minor] = NULL;
479 write_unlock(&obd_dev_lock);
483 * Register obd device.
485 * Find free slot in obd_devs[], fills it with \a new_obd.
487 * \param[in] new_obd obd_device to be registered
490 * \retval -EEXIST device with this name is registered
491 * \retval -EOVERFLOW obd_devs[] is full
493 int class_register_device(struct obd_device *new_obd)
497 int new_obd_minor = 0;
498 bool minor_assign = false;
499 bool retried = false;
502 write_lock(&obd_dev_lock);
503 for (i = 0; i < class_devno_max(); i++) {
504 struct obd_device *obd = class_num2obd(i);
507 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
510 write_unlock(&obd_dev_lock);
512 /* the obd_device could be waited to be
513 * destroyed by the "obd_zombie_impexp_thread".
515 obd_zombie_barrier();
520 CERROR("%s: already exists, won't add\n",
522 /* in case we found a free slot before duplicate */
523 minor_assign = false;
527 if (!minor_assign && obd == NULL) {
534 new_obd->obd_minor = new_obd_minor;
535 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
536 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
537 obd_devs[new_obd_minor] = new_obd;
541 CERROR("%s: all %u/%u devices used, increase "
542 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
543 i, class_devno_max(), ret);
546 write_unlock(&obd_dev_lock);
551 static int class_name2dev_nolock(const char *name)
558 for (i = 0; i < class_devno_max(); i++) {
559 struct obd_device *obd = class_num2obd(i);
561 if (obd && strcmp(name, obd->obd_name) == 0) {
562 /* Make sure we finished attaching before we give
563 out any references */
564 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
565 if (obd->obd_attached) {
575 int class_name2dev(const char *name)
582 read_lock(&obd_dev_lock);
583 i = class_name2dev_nolock(name);
584 read_unlock(&obd_dev_lock);
588 EXPORT_SYMBOL(class_name2dev);
590 struct obd_device *class_name2obd(const char *name)
592 int dev = class_name2dev(name);
594 if (dev < 0 || dev > class_devno_max())
596 return class_num2obd(dev);
598 EXPORT_SYMBOL(class_name2obd);
600 int class_uuid2dev_nolock(struct obd_uuid *uuid)
604 for (i = 0; i < class_devno_max(); i++) {
605 struct obd_device *obd = class_num2obd(i);
607 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
608 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
616 int class_uuid2dev(struct obd_uuid *uuid)
620 read_lock(&obd_dev_lock);
621 i = class_uuid2dev_nolock(uuid);
622 read_unlock(&obd_dev_lock);
626 EXPORT_SYMBOL(class_uuid2dev);
628 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
630 int dev = class_uuid2dev(uuid);
633 return class_num2obd(dev);
635 EXPORT_SYMBOL(class_uuid2obd);
638 * Get obd device from ::obd_devs[]
640 * \param num [in] array index
642 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
643 * otherwise return the obd device there.
645 struct obd_device *class_num2obd(int num)
647 struct obd_device *obd = NULL;
649 if (num < class_devno_max()) {
654 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
655 "%p obd_magic %08x != %08x\n",
656 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
657 LASSERTF(obd->obd_minor == num,
658 "%p obd_minor %0d != %0d\n",
659 obd, obd->obd_minor, num);
664 EXPORT_SYMBOL(class_num2obd);
667 * Find obd in obd_dev[] by name or uuid.
669 * Increment obd's refcount if found.
671 * \param[in] str obd name or uuid
673 * \retval NULL if not found
674 * \retval target pointer to found obd_device
676 struct obd_device *class_dev_by_str(const char *str)
678 struct obd_device *target = NULL;
679 struct obd_uuid tgtuuid;
682 obd_str2uuid(&tgtuuid, str);
684 read_lock(&obd_dev_lock);
685 rc = class_uuid2dev_nolock(&tgtuuid);
687 rc = class_name2dev_nolock(str);
690 target = class_num2obd(rc);
693 class_incref(target, "find", current);
694 read_unlock(&obd_dev_lock);
698 EXPORT_SYMBOL(class_dev_by_str);
701 * Get obd devices count. Device in any
703 * \retval obd device count
705 int get_devices_count(void)
707 int index, max_index = class_devno_max(), dev_count = 0;
709 read_lock(&obd_dev_lock);
710 for (index = 0; index <= max_index; index++) {
711 struct obd_device *obd = class_num2obd(index);
715 read_unlock(&obd_dev_lock);
719 EXPORT_SYMBOL(get_devices_count);
721 void class_obd_list(void)
726 read_lock(&obd_dev_lock);
727 for (i = 0; i < class_devno_max(); i++) {
728 struct obd_device *obd = class_num2obd(i);
732 if (obd->obd_stopping)
734 else if (obd->obd_set_up)
736 else if (obd->obd_attached)
740 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
741 i, status, obd->obd_type->typ_name,
742 obd->obd_name, obd->obd_uuid.uuid,
743 atomic_read(&obd->obd_refcount));
745 read_unlock(&obd_dev_lock);
748 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
749 * specified, then only the client with that uuid is returned,
750 * otherwise any client connected to the tgt is returned.
752 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
753 const char *type_name,
754 struct obd_uuid *grp_uuid)
758 read_lock(&obd_dev_lock);
759 for (i = 0; i < class_devno_max(); i++) {
760 struct obd_device *obd = class_num2obd(i);
764 if ((strncmp(obd->obd_type->typ_name, type_name,
765 strlen(type_name)) == 0)) {
766 if (obd_uuid_equals(tgt_uuid,
767 &obd->u.cli.cl_target_uuid) &&
768 ((grp_uuid)? obd_uuid_equals(grp_uuid,
769 &obd->obd_uuid) : 1)) {
770 read_unlock(&obd_dev_lock);
775 read_unlock(&obd_dev_lock);
779 EXPORT_SYMBOL(class_find_client_obd);
781 /* Iterate the obd_device list looking devices have grp_uuid. Start
782 * searching at *next, and if a device is found, the next index to look
783 * at is saved in *next. If next is NULL, then the first matching device
784 * will always be returned.
786 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
792 else if (*next >= 0 && *next < class_devno_max())
797 read_lock(&obd_dev_lock);
798 for (; i < class_devno_max(); i++) {
799 struct obd_device *obd = class_num2obd(i);
803 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
806 read_unlock(&obd_dev_lock);
810 read_unlock(&obd_dev_lock);
814 EXPORT_SYMBOL(class_devices_in_group);
817 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
818 * adjust sptlrpc settings accordingly.
820 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
822 struct obd_device *obd;
826 LASSERT(namelen > 0);
828 read_lock(&obd_dev_lock);
829 for (i = 0; i < class_devno_max(); i++) {
830 obd = class_num2obd(i);
832 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
835 /* only notify mdc, osc, osp, lwp, mdt, ost
836 * because only these have a -sptlrpc llog */
837 type = obd->obd_type->typ_name;
838 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
839 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
840 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
841 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
842 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
843 strcmp(type, LUSTRE_OST_NAME) != 0)
846 if (strncmp(obd->obd_name, fsname, namelen))
849 class_incref(obd, __FUNCTION__, obd);
850 read_unlock(&obd_dev_lock);
851 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
852 sizeof(KEY_SPTLRPC_CONF),
853 KEY_SPTLRPC_CONF, 0, NULL, NULL);
855 class_decref(obd, __FUNCTION__, obd);
856 read_lock(&obd_dev_lock);
858 read_unlock(&obd_dev_lock);
861 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
863 void obd_cleanup_caches(void)
866 if (obd_device_cachep) {
867 kmem_cache_destroy(obd_device_cachep);
868 obd_device_cachep = NULL;
874 int obd_init_caches(void)
879 LASSERT(obd_device_cachep == NULL);
880 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
881 sizeof(struct obd_device),
882 0, 0, 0, sizeof(struct obd_device), NULL);
883 if (!obd_device_cachep)
884 GOTO(out, rc = -ENOMEM);
888 obd_cleanup_caches();
892 static const char export_handle_owner[] = "export";
894 /* map connection to client */
895 struct obd_export *class_conn2export(struct lustre_handle *conn)
897 struct obd_export *export;
901 CDEBUG(D_CACHE, "looking for null handle\n");
905 if (conn->cookie == -1) { /* this means assign a new connection */
906 CDEBUG(D_CACHE, "want a new connection\n");
910 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
911 export = class_handle2object(conn->cookie, export_handle_owner);
914 EXPORT_SYMBOL(class_conn2export);
916 struct obd_device *class_exp2obd(struct obd_export *exp)
922 EXPORT_SYMBOL(class_exp2obd);
924 struct obd_import *class_exp2cliimp(struct obd_export *exp)
926 struct obd_device *obd = exp->exp_obd;
929 return obd->u.cli.cl_import;
931 EXPORT_SYMBOL(class_exp2cliimp);
933 /* Export management functions */
934 static void class_export_destroy(struct obd_export *exp)
936 struct obd_device *obd = exp->exp_obd;
939 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
940 LASSERT(obd != NULL);
942 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
943 exp->exp_client_uuid.uuid, obd->obd_name);
945 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
946 ptlrpc_connection_put(exp->exp_connection);
948 LASSERT(list_empty(&exp->exp_outstanding_replies));
949 LASSERT(list_empty(&exp->exp_uncommitted_replies));
950 LASSERT(list_empty(&exp->exp_req_replay_queue));
951 LASSERT(list_empty(&exp->exp_hp_rpcs));
952 obd_destroy_export(exp);
953 /* self export doesn't hold a reference to an obd, although it
954 * exists until freeing of the obd */
955 if (exp != obd->obd_self_export)
956 class_decref(obd, "export", exp);
958 OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
959 kfree_rcu(exp, exp_handle.h_rcu);
963 struct obd_export *class_export_get(struct obd_export *exp)
965 refcount_inc(&exp->exp_handle.h_ref);
966 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
967 refcount_read(&exp->exp_handle.h_ref));
970 EXPORT_SYMBOL(class_export_get);
972 void class_export_put(struct obd_export *exp)
974 LASSERT(exp != NULL);
975 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
976 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
977 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
978 refcount_read(&exp->exp_handle.h_ref) - 1);
980 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
981 struct obd_device *obd = exp->exp_obd;
983 CDEBUG(D_IOCTL, "final put %p/%s\n",
984 exp, exp->exp_client_uuid.uuid);
986 /* release nid stat refererence */
987 lprocfs_exp_cleanup(exp);
989 if (exp == obd->obd_self_export) {
990 /* self export should be destroyed without
991 * zombie thread as it doesn't hold a
992 * reference to obd and doesn't hold any
994 class_export_destroy(exp);
995 /* self export is destroyed, no class
996 * references exist and it is safe to free
1000 LASSERT(!list_empty(&exp->exp_obd_chain));
1001 obd_zombie_export_add(exp);
1006 EXPORT_SYMBOL(class_export_put);
1008 static void obd_zombie_exp_cull(struct work_struct *ws)
1010 struct obd_export *export;
1012 export = container_of(ws, struct obd_export, exp_zombie_work);
1013 class_export_destroy(export);
1016 /* Creates a new export, adds it to the hash table, and returns a
1017 * pointer to it. The refcount is 2: one for the hash reference, and
1018 * one for the pointer returned by this function. */
1019 struct obd_export *__class_new_export(struct obd_device *obd,
1020 struct obd_uuid *cluuid, bool is_self)
1022 struct obd_export *export;
1026 OBD_ALLOC_PTR(export);
1028 return ERR_PTR(-ENOMEM);
1030 export->exp_conn_cnt = 0;
1031 export->exp_lock_hash = NULL;
1032 export->exp_flock_hash = NULL;
1033 /* 2 = class_handle_hash + last */
1034 refcount_set(&export->exp_handle.h_ref, 2);
1035 atomic_set(&export->exp_rpc_count, 0);
1036 atomic_set(&export->exp_cb_count, 0);
1037 atomic_set(&export->exp_locks_count, 0);
1038 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1039 INIT_LIST_HEAD(&export->exp_locks_list);
1040 spin_lock_init(&export->exp_locks_list_guard);
1042 atomic_set(&export->exp_replay_count, 0);
1043 export->exp_obd = obd;
1044 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1045 spin_lock_init(&export->exp_uncommitted_replies_lock);
1046 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1047 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1048 INIT_HLIST_NODE(&export->exp_handle.h_link);
1049 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1050 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1051 class_handle_hash(&export->exp_handle, export_handle_owner);
1052 export->exp_last_request_time = ktime_get_real_seconds();
1053 spin_lock_init(&export->exp_lock);
1054 spin_lock_init(&export->exp_rpc_lock);
1055 INIT_HLIST_NODE(&export->exp_gen_hash);
1056 spin_lock_init(&export->exp_bl_list_lock);
1057 INIT_LIST_HEAD(&export->exp_bl_list);
1058 INIT_LIST_HEAD(&export->exp_stale_list);
1059 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1061 export->exp_sp_peer = LUSTRE_SP_ANY;
1062 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1063 export->exp_client_uuid = *cluuid;
1064 obd_init_export(export);
1066 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1068 spin_lock(&obd->obd_dev_lock);
1069 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1070 /* shouldn't happen, but might race */
1071 if (obd->obd_stopping)
1072 GOTO(exit_unlock, rc = -ENODEV);
1074 rc = obd_uuid_add(obd, export);
1076 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1077 obd->obd_name, cluuid->uuid, rc);
1078 GOTO(exit_unlock, rc = -EALREADY);
1083 class_incref(obd, "export", export);
1084 list_add_tail(&export->exp_obd_chain_timed,
1085 &obd->obd_exports_timed);
1086 list_add(&export->exp_obd_chain, &obd->obd_exports);
1087 obd->obd_num_exports++;
1089 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1090 INIT_LIST_HEAD(&export->exp_obd_chain);
1092 spin_unlock(&obd->obd_dev_lock);
1096 spin_unlock(&obd->obd_dev_lock);
1097 class_handle_unhash(&export->exp_handle);
1098 obd_destroy_export(export);
1099 OBD_FREE_PTR(export);
1103 struct obd_export *class_new_export(struct obd_device *obd,
1104 struct obd_uuid *uuid)
1106 return __class_new_export(obd, uuid, false);
1108 EXPORT_SYMBOL(class_new_export);
1110 struct obd_export *class_new_export_self(struct obd_device *obd,
1111 struct obd_uuid *uuid)
1113 return __class_new_export(obd, uuid, true);
1116 void class_unlink_export(struct obd_export *exp)
1118 class_handle_unhash(&exp->exp_handle);
1120 if (exp->exp_obd->obd_self_export == exp) {
1121 class_export_put(exp);
1125 spin_lock(&exp->exp_obd->obd_dev_lock);
1126 /* delete an uuid-export hashitem from hashtables */
1127 if (exp != exp->exp_obd->obd_self_export)
1128 obd_uuid_del(exp->exp_obd, exp);
1130 #ifdef HAVE_SERVER_SUPPORT
1131 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1132 struct tg_export_data *ted = &exp->exp_target_data;
1133 struct cfs_hash *hash;
1135 /* Because obd_gen_hash will not be released until
1136 * class_cleanup(), so hash should never be NULL here */
1137 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1138 LASSERT(hash != NULL);
1139 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1140 &exp->exp_gen_hash);
1141 cfs_hash_putref(hash);
1143 #endif /* HAVE_SERVER_SUPPORT */
1145 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1146 list_del_init(&exp->exp_obd_chain_timed);
1147 exp->exp_obd->obd_num_exports--;
1148 spin_unlock(&exp->exp_obd->obd_dev_lock);
1149 atomic_inc(&obd_stale_export_num);
1151 /* A reference is kept by obd_stale_exports list */
1152 obd_stale_export_put(exp);
1154 EXPORT_SYMBOL(class_unlink_export);
1156 /* Import management functions */
1157 static void obd_zombie_import_free(struct obd_import *imp)
1161 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1162 imp->imp_obd->obd_name);
1164 LASSERT(refcount_read(&imp->imp_refcount) == 0);
1166 ptlrpc_connection_put(imp->imp_connection);
1168 while (!list_empty(&imp->imp_conn_list)) {
1169 struct obd_import_conn *imp_conn;
1171 imp_conn = list_first_entry(&imp->imp_conn_list,
1172 struct obd_import_conn, oic_item);
1173 list_del_init(&imp_conn->oic_item);
1174 ptlrpc_connection_put(imp_conn->oic_conn);
1175 OBD_FREE(imp_conn, sizeof(*imp_conn));
1178 LASSERT(imp->imp_sec == NULL);
1179 LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1180 imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1181 class_decref(imp->imp_obd, "import", imp);
1186 struct obd_import *class_import_get(struct obd_import *import)
1188 refcount_inc(&import->imp_refcount);
1189 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1190 refcount_read(&import->imp_refcount),
1191 import->imp_obd->obd_name);
1194 EXPORT_SYMBOL(class_import_get);
1196 void class_import_put(struct obd_import *imp)
1200 LASSERT(refcount_read(&imp->imp_refcount) > 0);
1202 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1203 refcount_read(&imp->imp_refcount) - 1,
1204 imp->imp_obd->obd_name);
1206 if (refcount_dec_and_test(&imp->imp_refcount)) {
1207 CDEBUG(D_INFO, "final put import %p\n", imp);
1208 obd_zombie_import_add(imp);
1213 EXPORT_SYMBOL(class_import_put);
1215 static void init_imp_at(struct imp_at *at) {
1217 at_init(&at->iat_net_latency, 0, 0);
1218 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1219 /* max service estimates are tracked on the server side, so
1220 don't use the AT history here, just use the last reported
1221 val. (But keep hist for proc histogram, worst_ever) */
1222 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1227 static void obd_zombie_imp_cull(struct work_struct *ws)
1229 struct obd_import *import;
1231 import = container_of(ws, struct obd_import, imp_zombie_work);
1232 obd_zombie_import_free(import);
1235 struct obd_import *class_new_import(struct obd_device *obd)
1237 struct obd_import *imp;
1238 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1240 OBD_ALLOC(imp, sizeof(*imp));
1244 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1245 INIT_LIST_HEAD(&imp->imp_replay_list);
1246 INIT_LIST_HEAD(&imp->imp_sending_list);
1247 INIT_LIST_HEAD(&imp->imp_delayed_list);
1248 INIT_LIST_HEAD(&imp->imp_committed_list);
1249 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1250 imp->imp_known_replied_xid = 0;
1251 imp->imp_replay_cursor = &imp->imp_committed_list;
1252 spin_lock_init(&imp->imp_lock);
1253 imp->imp_last_success_conn = 0;
1254 imp->imp_state = LUSTRE_IMP_NEW;
1255 imp->imp_obd = class_incref(obd, "import", imp);
1256 rwlock_init(&imp->imp_sec_lock);
1257 init_waitqueue_head(&imp->imp_recovery_waitq);
1258 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1260 if (curr_pid_ns && curr_pid_ns->child_reaper)
1261 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1263 imp->imp_sec_refpid = 1;
1265 refcount_set(&imp->imp_refcount, 2);
1266 atomic_set(&imp->imp_unregistering, 0);
1267 atomic_set(&imp->imp_reqs, 0);
1268 atomic_set(&imp->imp_inflight, 0);
1269 atomic_set(&imp->imp_replay_inflight, 0);
1270 init_waitqueue_head(&imp->imp_replay_waitq);
1271 atomic_set(&imp->imp_inval_count, 0);
1272 INIT_LIST_HEAD(&imp->imp_conn_list);
1273 init_imp_at(&imp->imp_at);
1275 /* the default magic is V2, will be used in connect RPC, and
1276 * then adjusted according to the flags in request/reply. */
1277 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1281 EXPORT_SYMBOL(class_new_import);
1283 void class_destroy_import(struct obd_import *import)
1285 LASSERT(import != NULL);
1286 LASSERT(import != LP_POISON);
1288 spin_lock(&import->imp_lock);
1289 import->imp_generation++;
1290 spin_unlock(&import->imp_lock);
1291 class_import_put(import);
1293 EXPORT_SYMBOL(class_destroy_import);
1295 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1297 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1299 spin_lock(&exp->exp_locks_list_guard);
1301 LASSERT(lock->l_exp_refs_nr >= 0);
1303 if (lock->l_exp_refs_target != NULL &&
1304 lock->l_exp_refs_target != exp) {
1305 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1306 exp, lock, lock->l_exp_refs_target);
1308 if ((lock->l_exp_refs_nr ++) == 0) {
1309 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1310 lock->l_exp_refs_target = exp;
1312 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1313 lock, exp, lock->l_exp_refs_nr);
1314 spin_unlock(&exp->exp_locks_list_guard);
1316 EXPORT_SYMBOL(__class_export_add_lock_ref);
1318 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1320 spin_lock(&exp->exp_locks_list_guard);
1321 LASSERT(lock->l_exp_refs_nr > 0);
1322 if (lock->l_exp_refs_target != exp) {
1323 LCONSOLE_WARN("lock %p, "
1324 "mismatching export pointers: %p, %p\n",
1325 lock, lock->l_exp_refs_target, exp);
1327 if (-- lock->l_exp_refs_nr == 0) {
1328 list_del_init(&lock->l_exp_refs_link);
1329 lock->l_exp_refs_target = NULL;
1331 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1332 lock, exp, lock->l_exp_refs_nr);
1333 spin_unlock(&exp->exp_locks_list_guard);
1335 EXPORT_SYMBOL(__class_export_del_lock_ref);
1338 /* A connection defines an export context in which preallocation can
1339 be managed. This releases the export pointer reference, and returns
1340 the export handle, so the export refcount is 1 when this function
1342 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1343 struct obd_uuid *cluuid)
1345 struct obd_export *export;
1346 LASSERT(conn != NULL);
1347 LASSERT(obd != NULL);
1348 LASSERT(cluuid != NULL);
1351 export = class_new_export(obd, cluuid);
1353 RETURN(PTR_ERR(export));
1355 conn->cookie = export->exp_handle.h_cookie;
1356 class_export_put(export);
1358 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1359 cluuid->uuid, conn->cookie);
1362 EXPORT_SYMBOL(class_connect);
1364 /* if export is involved in recovery then clean up related things */
1365 static void class_export_recovery_cleanup(struct obd_export *exp)
1367 struct obd_device *obd = exp->exp_obd;
1369 spin_lock(&obd->obd_recovery_task_lock);
1370 if (obd->obd_recovering) {
1371 if (exp->exp_in_recovery) {
1372 spin_lock(&exp->exp_lock);
1373 exp->exp_in_recovery = 0;
1374 spin_unlock(&exp->exp_lock);
1375 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1376 atomic_dec(&obd->obd_connected_clients);
1379 /* if called during recovery then should update
1380 * obd_stale_clients counter,
1381 * lightweight exports are not counted */
1382 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1383 exp->exp_obd->obd_stale_clients++;
1385 spin_unlock(&obd->obd_recovery_task_lock);
1387 spin_lock(&exp->exp_lock);
1388 /** Cleanup req replay fields */
1389 if (exp->exp_req_replay_needed) {
1390 exp->exp_req_replay_needed = 0;
1392 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1393 atomic_dec(&obd->obd_req_replay_clients);
1396 /** Cleanup lock replay data */
1397 if (exp->exp_lock_replay_needed) {
1398 exp->exp_lock_replay_needed = 0;
1400 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1401 atomic_dec(&obd->obd_lock_replay_clients);
1403 spin_unlock(&exp->exp_lock);
1406 /* This function removes 1-3 references from the export:
1407 * 1 - for export pointer passed
1408 * and if disconnect really need
1409 * 2 - removing from hash
1410 * 3 - in client_unlink_export
1411 * The export pointer passed to this function can destroyed */
1412 int class_disconnect(struct obd_export *export)
1414 int already_disconnected;
1417 if (export == NULL) {
1418 CWARN("attempting to free NULL export %p\n", export);
1422 spin_lock(&export->exp_lock);
1423 already_disconnected = export->exp_disconnected;
1424 export->exp_disconnected = 1;
1425 #ifdef HAVE_SERVER_SUPPORT
1426 /* We hold references of export for uuid hash
1427 * and nid_hash and export link at least. So
1428 * it is safe to call rh*table_remove_fast in
1431 obd_nid_del(export->exp_obd, export);
1432 #endif /* HAVE_SERVER_SUPPORT */
1433 spin_unlock(&export->exp_lock);
1435 /* class_cleanup(), abort_recovery(), and class_fail_export()
1436 * all end up in here, and if any of them race we shouldn't
1437 * call extra class_export_puts(). */
1438 if (already_disconnected)
1439 GOTO(no_disconn, already_disconnected);
1441 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1442 export->exp_handle.h_cookie);
1444 class_export_recovery_cleanup(export);
1445 class_unlink_export(export);
1447 class_export_put(export);
1450 EXPORT_SYMBOL(class_disconnect);
1452 /* Return non-zero for a fully connected export */
1453 int class_connected_export(struct obd_export *exp)
1458 spin_lock(&exp->exp_lock);
1459 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1460 spin_unlock(&exp->exp_lock);
1464 EXPORT_SYMBOL(class_connected_export);
1466 static void class_disconnect_export_list(struct list_head *list,
1467 enum obd_option flags)
1470 struct obd_export *exp;
1473 /* It's possible that an export may disconnect itself, but
1474 * nothing else will be added to this list. */
1475 while (!list_empty(list)) {
1476 exp = list_first_entry(list, struct obd_export,
1478 /* need for safe call CDEBUG after obd_disconnect */
1479 class_export_get(exp);
1481 spin_lock(&exp->exp_lock);
1482 exp->exp_flags = flags;
1483 spin_unlock(&exp->exp_lock);
1485 if (obd_uuid_equals(&exp->exp_client_uuid,
1486 &exp->exp_obd->obd_uuid)) {
1488 "exp %p export uuid == obd uuid, don't discon\n",
1490 /* Need to delete this now so we don't end up pointing
1491 * to work_list later when this export is cleaned up. */
1492 list_del_init(&exp->exp_obd_chain);
1493 class_export_put(exp);
1497 class_export_get(exp);
1498 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1499 "last request at %lld\n",
1500 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1501 exp, exp->exp_last_request_time);
1502 /* release one export reference anyway */
1503 rc = obd_disconnect(exp);
1505 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1506 obd_export_nid2str(exp), exp, rc);
1507 class_export_put(exp);
1512 void class_disconnect_exports(struct obd_device *obd)
1514 LIST_HEAD(work_list);
1517 /* Move all of the exports from obd_exports to a work list, en masse. */
1518 spin_lock(&obd->obd_dev_lock);
1519 list_splice_init(&obd->obd_exports, &work_list);
1520 list_splice_init(&obd->obd_delayed_exports, &work_list);
1521 spin_unlock(&obd->obd_dev_lock);
1523 if (!list_empty(&work_list)) {
1524 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1525 "disconnecting them\n", obd->obd_minor, obd);
1526 class_disconnect_export_list(&work_list,
1527 exp_flags_from_obd(obd));
1529 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1530 obd->obd_minor, obd);
1533 EXPORT_SYMBOL(class_disconnect_exports);
1535 /* Remove exports that have not completed recovery.
1537 void class_disconnect_stale_exports(struct obd_device *obd,
1538 int (*test_export)(struct obd_export *))
1540 LIST_HEAD(work_list);
1541 struct obd_export *exp, *n;
1545 spin_lock(&obd->obd_dev_lock);
1546 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1548 /* don't count self-export as client */
1549 if (obd_uuid_equals(&exp->exp_client_uuid,
1550 &exp->exp_obd->obd_uuid))
1553 /* don't evict clients which have no slot in last_rcvd
1554 * (e.g. lightweight connection) */
1555 if (exp->exp_target_data.ted_lr_idx == -1)
1558 spin_lock(&exp->exp_lock);
1559 if (exp->exp_failed || test_export(exp)) {
1560 spin_unlock(&exp->exp_lock);
1563 exp->exp_failed = 1;
1564 spin_unlock(&exp->exp_lock);
1566 list_move(&exp->exp_obd_chain, &work_list);
1568 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1569 obd->obd_name, exp->exp_client_uuid.uuid,
1570 obd_export_nid2str(exp));
1571 print_export_data(exp, "EVICTING", 0, D_HA);
1573 spin_unlock(&obd->obd_dev_lock);
1576 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1577 obd->obd_name, evicted);
1579 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1580 OBD_OPT_ABORT_RECOV);
1583 EXPORT_SYMBOL(class_disconnect_stale_exports);
1585 void class_fail_export(struct obd_export *exp)
1587 int rc, already_failed;
1589 spin_lock(&exp->exp_lock);
1590 already_failed = exp->exp_failed;
1591 exp->exp_failed = 1;
1592 spin_unlock(&exp->exp_lock);
1594 if (already_failed) {
1595 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1596 exp, exp->exp_client_uuid.uuid);
1600 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1601 exp, exp->exp_client_uuid.uuid);
1603 if (obd_dump_on_timeout)
1604 libcfs_debug_dumplog();
1606 /* need for safe call CDEBUG after obd_disconnect */
1607 class_export_get(exp);
1609 /* Most callers into obd_disconnect are removing their own reference
1610 * (request, for example) in addition to the one from the hash table.
1611 * We don't have such a reference here, so make one. */
1612 class_export_get(exp);
1613 rc = obd_disconnect(exp);
1615 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1617 CDEBUG(D_HA, "disconnected export %p/%s\n",
1618 exp, exp->exp_client_uuid.uuid);
1619 class_export_put(exp);
1621 EXPORT_SYMBOL(class_fail_export);
1623 #ifdef HAVE_SERVER_SUPPORT
1625 static int take_first(struct obd_export *exp, void *data)
1627 struct obd_export **expp = data;
1630 /* already have one */
1632 if (exp->exp_failed)
1633 /* Don't want this one */
1635 if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1636 /* Cannot get a ref on this one */
1642 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1644 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1645 struct obd_export *doomed_exp;
1646 int exports_evicted = 0;
1648 spin_lock(&obd->obd_dev_lock);
1649 /* umount has run already, so evict thread should leave
1650 * its task to umount thread now */
1651 if (obd->obd_stopping) {
1652 spin_unlock(&obd->obd_dev_lock);
1653 return exports_evicted;
1655 spin_unlock(&obd->obd_dev_lock);
1658 while (obd_nid_export_for_each(obd, nid_key,
1659 take_first, &doomed_exp) > 0) {
1661 LASSERTF(doomed_exp != obd->obd_self_export,
1662 "self-export is hashed by NID?\n");
1664 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1666 obd_uuid2str(&doomed_exp->exp_client_uuid),
1667 obd_export_nid2str(doomed_exp));
1669 class_fail_export(doomed_exp);
1670 class_export_put(doomed_exp);
1675 if (!exports_evicted)
1677 "%s: can't disconnect NID '%s': no exports found\n",
1678 obd->obd_name, nid);
1679 return exports_evicted;
1681 EXPORT_SYMBOL(obd_export_evict_by_nid);
1683 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1685 struct obd_export *doomed_exp = NULL;
1686 struct obd_uuid doomed_uuid;
1687 int exports_evicted = 0;
1689 spin_lock(&obd->obd_dev_lock);
1690 if (obd->obd_stopping) {
1691 spin_unlock(&obd->obd_dev_lock);
1692 return exports_evicted;
1694 spin_unlock(&obd->obd_dev_lock);
1696 obd_str2uuid(&doomed_uuid, uuid);
1697 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1698 CERROR("%s: can't evict myself\n", obd->obd_name);
1699 return exports_evicted;
1702 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1703 if (doomed_exp == NULL) {
1704 CERROR("%s: can't disconnect %s: no exports found\n",
1705 obd->obd_name, uuid);
1707 CWARN("%s: evicting %s at adminstrative request\n",
1708 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1709 class_fail_export(doomed_exp);
1710 class_export_put(doomed_exp);
1711 obd_uuid_del(obd, doomed_exp);
1715 return exports_evicted;
1717 #endif /* HAVE_SERVER_SUPPORT */
1719 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1720 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1721 EXPORT_SYMBOL(class_export_dump_hook);
1724 static void print_export_data(struct obd_export *exp, const char *status,
1725 int locks, int debug_level)
1727 struct ptlrpc_reply_state *rs;
1728 struct ptlrpc_reply_state *first_reply = NULL;
1731 spin_lock(&exp->exp_lock);
1732 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1738 spin_unlock(&exp->exp_lock);
1740 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1741 "%p %s %llu stale:%d\n",
1742 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1743 obd_export_nid2str(exp),
1744 refcount_read(&exp->exp_handle.h_ref),
1745 atomic_read(&exp->exp_rpc_count),
1746 atomic_read(&exp->exp_cb_count),
1747 atomic_read(&exp->exp_locks_count),
1748 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1749 nreplies, first_reply, nreplies > 3 ? "..." : "",
1750 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1751 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1752 if (locks && class_export_dump_hook != NULL)
1753 class_export_dump_hook(exp);
1757 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1759 struct obd_export *exp;
1761 spin_lock(&obd->obd_dev_lock);
1762 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1763 print_export_data(exp, "ACTIVE", locks, debug_level);
1764 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1765 print_export_data(exp, "UNLINKED", locks, debug_level);
1766 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1767 print_export_data(exp, "DELAYED", locks, debug_level);
1768 spin_unlock(&obd->obd_dev_lock);
1771 void obd_exports_barrier(struct obd_device *obd)
1774 LASSERT(list_empty(&obd->obd_exports));
1775 spin_lock(&obd->obd_dev_lock);
1776 while (!list_empty(&obd->obd_unlinked_exports)) {
1777 spin_unlock(&obd->obd_dev_lock);
1778 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1779 if (waited > 5 && is_power_of_2(waited)) {
1780 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1781 "more than %d seconds. "
1782 "The obd refcount = %d. Is it stuck?\n",
1783 obd->obd_name, waited,
1784 atomic_read(&obd->obd_refcount));
1785 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1788 spin_lock(&obd->obd_dev_lock);
1790 spin_unlock(&obd->obd_dev_lock);
1792 EXPORT_SYMBOL(obd_exports_barrier);
1795 * Add export to the obd_zombe thread and notify it.
1797 static void obd_zombie_export_add(struct obd_export *exp) {
1798 atomic_dec(&obd_stale_export_num);
1799 spin_lock(&exp->exp_obd->obd_dev_lock);
1800 LASSERT(!list_empty(&exp->exp_obd_chain));
1801 list_del_init(&exp->exp_obd_chain);
1802 spin_unlock(&exp->exp_obd->obd_dev_lock);
1804 queue_work(zombie_wq, &exp->exp_zombie_work);
1808 * Add import to the obd_zombe thread and notify it.
1810 static void obd_zombie_import_add(struct obd_import *imp) {
1811 LASSERT(imp->imp_sec == NULL);
1813 queue_work(zombie_wq, &imp->imp_zombie_work);
1817 * wait when obd_zombie import/export queues become empty
1819 void obd_zombie_barrier(void)
1821 flush_workqueue(zombie_wq);
1823 EXPORT_SYMBOL(obd_zombie_barrier);
1826 struct obd_export *obd_stale_export_get(void)
1828 struct obd_export *exp = NULL;
1831 spin_lock(&obd_stale_export_lock);
1832 if (!list_empty(&obd_stale_exports)) {
1833 exp = list_first_entry(&obd_stale_exports,
1834 struct obd_export, exp_stale_list);
1835 list_del_init(&exp->exp_stale_list);
1837 spin_unlock(&obd_stale_export_lock);
1840 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1841 atomic_read(&obd_stale_export_num));
1845 EXPORT_SYMBOL(obd_stale_export_get);
1847 void obd_stale_export_put(struct obd_export *exp)
1851 LASSERT(list_empty(&exp->exp_stale_list));
1852 if (exp->exp_lock_hash &&
1853 atomic_read(&exp->exp_lock_hash->hs_count)) {
1854 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1855 atomic_read(&obd_stale_export_num));
1857 spin_lock_bh(&exp->exp_bl_list_lock);
1858 spin_lock(&obd_stale_export_lock);
1859 /* Add to the tail if there is no blocked locks,
1860 * to the head otherwise. */
1861 if (list_empty(&exp->exp_bl_list))
1862 list_add_tail(&exp->exp_stale_list,
1863 &obd_stale_exports);
1865 list_add(&exp->exp_stale_list,
1866 &obd_stale_exports);
1868 spin_unlock(&obd_stale_export_lock);
1869 spin_unlock_bh(&exp->exp_bl_list_lock);
1871 class_export_put(exp);
1875 EXPORT_SYMBOL(obd_stale_export_put);
1878 * Adjust the position of the export in the stale list,
1879 * i.e. move to the head of the list if is needed.
1881 void obd_stale_export_adjust(struct obd_export *exp)
1883 LASSERT(exp != NULL);
1884 spin_lock_bh(&exp->exp_bl_list_lock);
1885 spin_lock(&obd_stale_export_lock);
1887 if (!list_empty(&exp->exp_stale_list) &&
1888 !list_empty(&exp->exp_bl_list))
1889 list_move(&exp->exp_stale_list, &obd_stale_exports);
1891 spin_unlock(&obd_stale_export_lock);
1892 spin_unlock_bh(&exp->exp_bl_list_lock);
1894 EXPORT_SYMBOL(obd_stale_export_adjust);
1897 * start destroy zombie import/export thread
1899 int obd_zombie_impexp_init(void)
1901 zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1903 cfs_cpt_number(cfs_cpt_tab));
1905 return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1909 * stop destroy zombie import/export thread
1911 void obd_zombie_impexp_stop(void)
1913 destroy_workqueue(zombie_wq);
1914 LASSERT(list_empty(&obd_stale_exports));
1917 /***** Kernel-userspace comm helpers *******/
1919 /* Get length of entire message, including header */
1920 int kuc_len(int payload_len)
1922 return sizeof(struct kuc_hdr) + payload_len;
1924 EXPORT_SYMBOL(kuc_len);
1926 /* Get a pointer to kuc header, given a ptr to the payload
1927 * @param p Pointer to payload area
1928 * @returns Pointer to kuc header
1930 struct kuc_hdr * kuc_ptr(void *p)
1932 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1933 LASSERT(lh->kuc_magic == KUC_MAGIC);
1936 EXPORT_SYMBOL(kuc_ptr);
1938 /* Alloc space for a message, and fill in header
1939 * @return Pointer to payload area
1941 void *kuc_alloc(int payload_len, int transport, int type)
1944 int len = kuc_len(payload_len);
1948 return ERR_PTR(-ENOMEM);
1950 lh->kuc_magic = KUC_MAGIC;
1951 lh->kuc_transport = transport;
1952 lh->kuc_msgtype = type;
1953 lh->kuc_msglen = len;
1955 return (void *)(lh + 1);
1957 EXPORT_SYMBOL(kuc_alloc);
1959 /* Takes pointer to payload area */
1960 void kuc_free(void *p, int payload_len)
1962 struct kuc_hdr *lh = kuc_ptr(p);
1963 OBD_FREE(lh, kuc_len(payload_len));
1965 EXPORT_SYMBOL(kuc_free);
1967 struct obd_request_slot_waiter {
1968 struct list_head orsw_entry;
1969 wait_queue_head_t orsw_waitq;
1973 static bool obd_request_slot_avail(struct client_obd *cli,
1974 struct obd_request_slot_waiter *orsw)
1978 spin_lock(&cli->cl_loi_list_lock);
1979 avail = !!list_empty(&orsw->orsw_entry);
1980 spin_unlock(&cli->cl_loi_list_lock);
1986 * For network flow control, the RPC sponsor needs to acquire a credit
1987 * before sending the RPC. The credits count for a connection is defined
1988 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1989 * the subsequent RPC sponsors need to wait until others released their
1990 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1992 int obd_get_request_slot(struct client_obd *cli)
1994 struct obd_request_slot_waiter orsw;
1997 spin_lock(&cli->cl_loi_list_lock);
1998 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1999 cli->cl_rpcs_in_flight++;
2000 spin_unlock(&cli->cl_loi_list_lock);
2004 init_waitqueue_head(&orsw.orsw_waitq);
2005 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2006 orsw.orsw_signaled = false;
2007 spin_unlock(&cli->cl_loi_list_lock);
2009 rc = l_wait_event_abortable(orsw.orsw_waitq,
2010 obd_request_slot_avail(cli, &orsw) ||
2011 orsw.orsw_signaled);
2013 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2014 * freed but other (such as obd_put_request_slot) is using it. */
2015 spin_lock(&cli->cl_loi_list_lock);
2017 if (!orsw.orsw_signaled) {
2018 if (list_empty(&orsw.orsw_entry))
2019 cli->cl_rpcs_in_flight--;
2021 list_del(&orsw.orsw_entry);
2026 if (orsw.orsw_signaled) {
2027 LASSERT(list_empty(&orsw.orsw_entry));
2031 spin_unlock(&cli->cl_loi_list_lock);
2035 EXPORT_SYMBOL(obd_get_request_slot);
2037 void obd_put_request_slot(struct client_obd *cli)
2039 struct obd_request_slot_waiter *orsw;
2041 spin_lock(&cli->cl_loi_list_lock);
2042 cli->cl_rpcs_in_flight--;
2044 /* If there is free slot, wakeup the first waiter. */
2045 if (!list_empty(&cli->cl_flight_waiters) &&
2046 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2047 orsw = list_first_entry(&cli->cl_flight_waiters,
2048 struct obd_request_slot_waiter,
2050 list_del_init(&orsw->orsw_entry);
2051 cli->cl_rpcs_in_flight++;
2052 wake_up(&orsw->orsw_waitq);
2054 spin_unlock(&cli->cl_loi_list_lock);
2056 EXPORT_SYMBOL(obd_put_request_slot);
2058 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2060 return cli->cl_max_rpcs_in_flight;
2062 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2064 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2066 struct obd_request_slot_waiter *orsw;
2072 if (max > OBD_MAX_RIF_MAX || max < 1)
2075 CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2076 cli->cl_import->imp_obd->obd_name, max,
2077 cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2079 if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2080 LUSTRE_MDC_NAME) == 0) {
2081 /* adjust max_mod_rpcs_in_flight to ensure it is always
2082 * strictly lower that max_rpcs_in_flight */
2084 CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2085 cli->cl_import->imp_obd->obd_name);
2088 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2089 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2095 spin_lock(&cli->cl_loi_list_lock);
2096 old = cli->cl_max_rpcs_in_flight;
2097 cli->cl_max_rpcs_in_flight = max;
2098 client_adjust_max_dirty(cli);
2102 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2103 for (i = 0; i < diff; i++) {
2104 if (list_empty(&cli->cl_flight_waiters))
2107 orsw = list_first_entry(&cli->cl_flight_waiters,
2108 struct obd_request_slot_waiter,
2110 list_del_init(&orsw->orsw_entry);
2111 cli->cl_rpcs_in_flight++;
2112 wake_up(&orsw->orsw_waitq);
2114 spin_unlock(&cli->cl_loi_list_lock);
2118 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2120 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2122 return cli->cl_max_mod_rpcs_in_flight;
2124 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2126 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2128 struct obd_connect_data *ocd;
2132 if (max > OBD_MAX_RIF_MAX || max < 1)
2135 ocd = &cli->cl_import->imp_connect_data;
2136 CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2137 cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2138 ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2140 if (max == OBD_MAX_RIF_MAX)
2141 max = OBD_MAX_RIF_MAX - 1;
2143 /* Cannot exceed or equal max_rpcs_in_flight. If we are asked to
2144 * increase this value, also bump up max_rpcs_in_flight to match.
2146 if (max >= cli->cl_max_rpcs_in_flight) {
2148 "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2149 cli->cl_import->imp_obd->obd_name, max + 1, max);
2150 obd_set_max_rpcs_in_flight(cli, max + 1);
2153 /* cannot exceed max modify RPCs in flight supported by the server,
2154 * but verify ocd_connect_flags is at least initialized first. If
2155 * not, allow it and fix value later in ptlrpc_connect_set_flags().
2157 if (!ocd->ocd_connect_flags) {
2158 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2159 } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2160 maxmodrpcs = ocd->ocd_maxmodrpcs;
2161 if (maxmodrpcs == 0) { /* connection not finished yet */
2162 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2164 "%s: partial connect, assume maxmodrpcs=%hu\n",
2165 cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2170 if (max > maxmodrpcs) {
2171 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than ocd_maxmodrpcs=%hu returned by the server at connection\n",
2172 cli->cl_import->imp_obd->obd_name,
2177 spin_lock(&cli->cl_mod_rpcs_lock);
2179 prev = cli->cl_max_mod_rpcs_in_flight;
2180 cli->cl_max_mod_rpcs_in_flight = max;
2182 /* wakeup waiters if limit has been increased */
2183 if (cli->cl_max_mod_rpcs_in_flight > prev)
2184 wake_up(&cli->cl_mod_rpcs_waitq);
2186 spin_unlock(&cli->cl_mod_rpcs_lock);
2190 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2192 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2193 struct seq_file *seq)
2195 unsigned long mod_tot = 0, mod_cum;
2196 struct timespec64 now;
2199 ktime_get_real_ts64(&now);
2201 spin_lock(&cli->cl_mod_rpcs_lock);
2203 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2204 (s64)now.tv_sec, now.tv_nsec);
2205 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2206 cli->cl_mod_rpcs_in_flight);
2208 seq_printf(seq, "\n\t\t\tmodify\n");
2209 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2211 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2214 for (i = 0; i < OBD_HIST_MAX; i++) {
2215 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2217 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2218 i, mod, pct(mod, mod_tot),
2219 pct(mod_cum, mod_tot));
2220 if (mod_cum == mod_tot)
2224 spin_unlock(&cli->cl_mod_rpcs_lock);
2228 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2230 /* The number of modify RPCs sent in parallel is limited
2231 * because the server has a finite number of slots per client to
2232 * store request result and ensure reply reconstruction when needed.
2233 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2234 * that takes into account server limit and cl_max_rpcs_in_flight
2236 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2237 * one close request is allowed above the maximum.
2239 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2244 /* A slot is available if
2245 * - number of modify RPCs in flight is less than the max
2246 * - it's a close RPC and no other close request is in flight
2248 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2249 (close_req && cli->cl_close_rpcs_in_flight == 0);
2254 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2259 spin_lock(&cli->cl_mod_rpcs_lock);
2260 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2261 spin_unlock(&cli->cl_mod_rpcs_lock);
2266 /* Get a modify RPC slot from the obd client @cli according
2267 * to the kind of operation @opc that is going to be sent
2268 * and the intent @it of the operation if it applies.
2269 * If the maximum number of modify RPCs in flight is reached
2270 * the thread is put to sleep.
2271 * Returns the tag to be set in the request message. Tag 0
2272 * is reserved for non-modifying requests.
2274 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2276 bool close_req = false;
2279 if (opc == MDS_CLOSE)
2283 spin_lock(&cli->cl_mod_rpcs_lock);
2284 max = cli->cl_max_mod_rpcs_in_flight;
2285 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2286 /* there is a slot available */
2287 cli->cl_mod_rpcs_in_flight++;
2289 cli->cl_close_rpcs_in_flight++;
2290 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2291 cli->cl_mod_rpcs_in_flight);
2292 /* find a free tag */
2293 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2295 LASSERT(i < OBD_MAX_RIF_MAX);
2296 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2297 spin_unlock(&cli->cl_mod_rpcs_lock);
2298 /* tag 0 is reserved for non-modify RPCs */
2301 "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2302 cli->cl_import->imp_obd->obd_name,
2307 spin_unlock(&cli->cl_mod_rpcs_lock);
2309 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2310 "opc %u, max %hu\n",
2311 cli->cl_import->imp_obd->obd_name, opc, max);
2313 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2314 obd_mod_rpc_slot_avail(cli,
2318 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2320 /* Put a modify RPC slot from the obd client @cli according
2321 * to the kind of operation @opc that has been sent.
2323 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2325 bool close_req = false;
2330 if (opc == MDS_CLOSE)
2333 spin_lock(&cli->cl_mod_rpcs_lock);
2334 cli->cl_mod_rpcs_in_flight--;
2336 cli->cl_close_rpcs_in_flight--;
2337 /* release the tag in the bitmap */
2338 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2339 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2340 spin_unlock(&cli->cl_mod_rpcs_lock);
2341 wake_up(&cli->cl_mod_rpcs_waitq);
2343 EXPORT_SYMBOL(obd_put_mod_rpc_slot);