4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/obdclass/genops.c
33 * These are the only exported functions, they provide some generic
34 * infrastructure for managing object devices
37 #define DEBUG_SUBSYSTEM S_CLASS
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
48 DEFINE_RWLOCK(obd_dev_lock);
49 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51 static struct kmem_cache *obd_device_cachep;
52 static struct kobj_type class_ktype;
53 static struct workqueue_struct *zombie_wq;
55 static void obd_zombie_export_add(struct obd_export *exp);
56 static void obd_zombie_import_add(struct obd_import *imp);
57 static void print_export_data(struct obd_export *exp,
58 const char *status, int locks, int debug_level);
60 static LIST_HEAD(obd_stale_exports);
61 static DEFINE_SPINLOCK(obd_stale_export_lock);
62 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 * support functions: we could use inter-module communication, but this
66 * is more portable to other OS's
68 static struct obd_device *obd_device_alloc(void)
70 struct obd_device *obd;
72 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74 obd->obd_magic = OBD_DEVICE_MAGIC;
79 static void obd_device_free(struct obd_device *obd)
82 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84 if (obd->obd_namespace != NULL) {
85 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86 obd, obd->obd_namespace, obd->obd_force);
89 lu_ref_fini(&obd->obd_reference);
90 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 struct obd_type *class_search_type(const char *name)
95 struct kobject *kobj = kset_find_obj(lustre_kset, name);
97 if (kobj && kobj->ktype == &class_ktype)
98 return container_of(kobj, struct obd_type, typ_kobj);
103 EXPORT_SYMBOL(class_search_type);
105 struct obd_type *class_get_type(const char *name)
107 struct obd_type *type;
109 type = class_search_type(name);
110 #ifdef HAVE_MODULE_LOADING_SUPPORT
112 const char *modname = name;
114 #ifdef HAVE_SERVER_SUPPORT
115 if (strcmp(modname, "obdfilter") == 0)
118 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119 modname = LUSTRE_OSP_NAME;
121 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122 modname = LUSTRE_MDT_NAME;
123 #endif /* HAVE_SERVER_SUPPORT */
125 if (!request_module("%s", modname)) {
126 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127 type = class_search_type(name);
129 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135 if (try_module_get(type->typ_dt_ops->o_owner)) {
136 atomic_inc(&type->typ_refcnt);
137 /* class_search_type() returned a counted reference,
138 * but we don't need that count any more as
139 * we have one through typ_refcnt.
141 kobject_put(&type->typ_kobj);
143 kobject_put(&type->typ_kobj);
150 void class_put_type(struct obd_type *type)
153 module_put(type->typ_dt_ops->o_owner);
154 atomic_dec(&type->typ_refcnt);
157 static void class_sysfs_release(struct kobject *kobj)
159 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
161 debugfs_remove_recursive(type->typ_debugfs_entry);
162 type->typ_debugfs_entry = NULL;
165 lu_device_type_fini(type->typ_lu);
167 #ifdef CONFIG_PROC_FS
168 if (type->typ_name && type->typ_procroot)
169 remove_proc_subtree(type->typ_name, proc_lustre_root);
171 OBD_FREE(type, sizeof(*type));
174 static struct kobj_type class_ktype = {
175 .sysfs_ops = &lustre_sysfs_ops,
176 .release = class_sysfs_release,
179 #ifdef HAVE_SERVER_SUPPORT
180 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
182 struct dentry *symlink;
183 struct obd_type *type;
186 type = class_search_type(name);
188 kobject_put(&type->typ_kobj);
189 return ERR_PTR(-EEXIST);
192 OBD_ALLOC(type, sizeof(*type));
194 return ERR_PTR(-ENOMEM);
196 type->typ_kobj.kset = lustre_kset;
197 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
198 &lustre_kset->kobj, "%s", name);
202 symlink = debugfs_create_dir(name, debugfs_lustre_root);
203 type->typ_debugfs_entry = symlink;
204 type->typ_sym_filter = true;
207 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
209 if (IS_ERR(type->typ_procroot)) {
210 CERROR("%s: can't create compat proc entry: %d\n",
211 name, (int)PTR_ERR(type->typ_procroot));
212 type->typ_procroot = NULL;
218 EXPORT_SYMBOL(class_add_symlinks);
219 #endif /* HAVE_SERVER_SUPPORT */
221 #define CLASS_MAX_NAME 1024
223 int class_register_type(const struct obd_ops *dt_ops,
224 const struct md_ops *md_ops,
226 const char *name, struct lu_device_type *ldt)
228 struct obd_type *type;
233 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
235 type = class_search_type(name);
237 #ifdef HAVE_SERVER_SUPPORT
238 if (type->typ_sym_filter)
240 #endif /* HAVE_SERVER_SUPPORT */
241 kobject_put(&type->typ_kobj);
242 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
246 OBD_ALLOC(type, sizeof(*type));
250 type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
251 type->typ_kobj.kset = lustre_kset;
252 kobject_init(&type->typ_kobj, &class_ktype);
253 #ifdef HAVE_SERVER_SUPPORT
255 #endif /* HAVE_SERVER_SUPPORT */
257 type->typ_dt_ops = dt_ops;
258 type->typ_md_ops = md_ops;
260 #ifdef HAVE_SERVER_SUPPORT
261 if (type->typ_sym_filter) {
262 type->typ_sym_filter = false;
263 kobject_put(&type->typ_kobj);
267 #ifdef CONFIG_PROC_FS
268 if (enable_proc && !type->typ_procroot) {
269 type->typ_procroot = lprocfs_register(name,
272 if (IS_ERR(type->typ_procroot)) {
273 rc = PTR_ERR(type->typ_procroot);
274 type->typ_procroot = NULL;
279 type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
281 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
284 #ifdef HAVE_SERVER_SUPPORT
288 rc = lu_device_type_init(ldt);
289 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
290 wake_up_var(&type->typ_lu);
298 kobject_put(&type->typ_kobj);
302 EXPORT_SYMBOL(class_register_type);
304 int class_unregister_type(const char *name)
306 struct obd_type *type = class_search_type(name);
311 CERROR("unknown obd type\n");
315 if (atomic_read(&type->typ_refcnt)) {
316 CERROR("type %s has refcount (%d)\n", name,
317 atomic_read(&type->typ_refcnt));
318 /* This is a bad situation, let's make the best of it */
319 /* Remove ops, but leave the name for debugging */
320 type->typ_dt_ops = NULL;
321 type->typ_md_ops = NULL;
322 GOTO(out_put, rc = -EBUSY);
325 /* Put the final ref */
326 kobject_put(&type->typ_kobj);
328 /* Put the ref returned by class_search_type() */
329 kobject_put(&type->typ_kobj);
332 } /* class_unregister_type */
333 EXPORT_SYMBOL(class_unregister_type);
336 * Create a new obd device.
338 * Allocate the new obd_device and initialize it.
340 * \param[in] type_name obd device type string.
341 * \param[in] name obd device name.
342 * \param[in] uuid obd device UUID
344 * \retval newdev pointer to created obd_device
345 * \retval ERR_PTR(errno) on error
347 struct obd_device *class_newdev(const char *type_name, const char *name,
350 struct obd_device *newdev;
351 struct obd_type *type = NULL;
354 if (strlen(name) >= MAX_OBD_NAME) {
355 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
356 RETURN(ERR_PTR(-EINVAL));
359 type = class_get_type(type_name);
361 CERROR("OBD: unknown type: %s\n", type_name);
362 RETURN(ERR_PTR(-ENODEV));
365 newdev = obd_device_alloc();
366 if (newdev == NULL) {
367 class_put_type(type);
368 RETURN(ERR_PTR(-ENOMEM));
370 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
371 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
372 newdev->obd_type = type;
373 newdev->obd_minor = -1;
375 rwlock_init(&newdev->obd_pool_lock);
376 newdev->obd_pool_limit = 0;
377 newdev->obd_pool_slv = 0;
379 INIT_LIST_HEAD(&newdev->obd_exports);
380 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
381 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
382 INIT_LIST_HEAD(&newdev->obd_exports_timed);
383 INIT_LIST_HEAD(&newdev->obd_nid_stats);
384 spin_lock_init(&newdev->obd_nid_lock);
385 spin_lock_init(&newdev->obd_dev_lock);
386 mutex_init(&newdev->obd_dev_mutex);
387 spin_lock_init(&newdev->obd_osfs_lock);
388 /* newdev->obd_osfs_age must be set to a value in the distant
389 * past to guarantee a fresh statfs is fetched on mount. */
390 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
392 /* XXX belongs in setup not attach */
393 init_rwsem(&newdev->obd_observer_link_sem);
395 spin_lock_init(&newdev->obd_recovery_task_lock);
396 init_waitqueue_head(&newdev->obd_next_transno_waitq);
397 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
398 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
399 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
400 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
401 INIT_LIST_HEAD(&newdev->obd_evict_list);
402 INIT_LIST_HEAD(&newdev->obd_lwp_list);
404 llog_group_init(&newdev->obd_olg);
405 /* Detach drops this */
406 atomic_set(&newdev->obd_refcount, 1);
407 lu_ref_init(&newdev->obd_reference);
408 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
410 newdev->obd_conn_inprogress = 0;
412 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
414 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
415 newdev->obd_name, newdev);
423 * \param[in] obd obd_device to be freed
427 void class_free_dev(struct obd_device *obd)
429 struct obd_type *obd_type = obd->obd_type;
431 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
432 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
433 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
434 "obd %p != obd_devs[%d] %p\n",
435 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
436 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
437 "obd_refcount should be 0, not %d\n",
438 atomic_read(&obd->obd_refcount));
439 LASSERT(obd_type != NULL);
441 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
442 obd->obd_name, obd->obd_type->typ_name);
444 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
445 obd->obd_name, obd->obd_uuid.uuid);
446 if (obd->obd_stopping) {
449 /* If we're not stopping, we were never set up */
450 err = obd_cleanup(obd);
452 CERROR("Cleanup %s returned %d\n",
456 obd_device_free(obd);
458 class_put_type(obd_type);
462 * Unregister obd device.
464 * Free slot in obd_dev[] used by \a obd.
466 * \param[in] new_obd obd_device to be unregistered
470 void class_unregister_device(struct obd_device *obd)
472 write_lock(&obd_dev_lock);
473 if (obd->obd_minor >= 0) {
474 LASSERT(obd_devs[obd->obd_minor] == obd);
475 obd_devs[obd->obd_minor] = NULL;
478 write_unlock(&obd_dev_lock);
482 * Register obd device.
484 * Find free slot in obd_devs[], fills it with \a new_obd.
486 * \param[in] new_obd obd_device to be registered
489 * \retval -EEXIST device with this name is registered
490 * \retval -EOVERFLOW obd_devs[] is full
492 int class_register_device(struct obd_device *new_obd)
496 int new_obd_minor = 0;
497 bool minor_assign = false;
498 bool retried = false;
501 write_lock(&obd_dev_lock);
502 for (i = 0; i < class_devno_max(); i++) {
503 struct obd_device *obd = class_num2obd(i);
506 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
509 write_unlock(&obd_dev_lock);
511 /* the obd_device could be waited to be
512 * destroyed by the "obd_zombie_impexp_thread".
514 obd_zombie_barrier();
519 CERROR("%s: already exists, won't add\n",
521 /* in case we found a free slot before duplicate */
522 minor_assign = false;
526 if (!minor_assign && obd == NULL) {
533 new_obd->obd_minor = new_obd_minor;
534 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
535 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
536 obd_devs[new_obd_minor] = new_obd;
540 CERROR("%s: all %u/%u devices used, increase "
541 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
542 i, class_devno_max(), ret);
545 write_unlock(&obd_dev_lock);
550 static int class_name2dev_nolock(const char *name)
557 for (i = 0; i < class_devno_max(); i++) {
558 struct obd_device *obd = class_num2obd(i);
560 if (obd && strcmp(name, obd->obd_name) == 0) {
561 /* Make sure we finished attaching before we give
562 out any references */
563 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
564 if (obd->obd_attached) {
574 int class_name2dev(const char *name)
581 read_lock(&obd_dev_lock);
582 i = class_name2dev_nolock(name);
583 read_unlock(&obd_dev_lock);
587 EXPORT_SYMBOL(class_name2dev);
589 struct obd_device *class_name2obd(const char *name)
591 int dev = class_name2dev(name);
593 if (dev < 0 || dev > class_devno_max())
595 return class_num2obd(dev);
597 EXPORT_SYMBOL(class_name2obd);
599 int class_uuid2dev_nolock(struct obd_uuid *uuid)
603 for (i = 0; i < class_devno_max(); i++) {
604 struct obd_device *obd = class_num2obd(i);
606 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
607 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
615 int class_uuid2dev(struct obd_uuid *uuid)
619 read_lock(&obd_dev_lock);
620 i = class_uuid2dev_nolock(uuid);
621 read_unlock(&obd_dev_lock);
625 EXPORT_SYMBOL(class_uuid2dev);
627 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
629 int dev = class_uuid2dev(uuid);
632 return class_num2obd(dev);
634 EXPORT_SYMBOL(class_uuid2obd);
637 * Get obd device from ::obd_devs[]
639 * \param num [in] array index
641 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
642 * otherwise return the obd device there.
644 struct obd_device *class_num2obd(int num)
646 struct obd_device *obd = NULL;
648 if (num < class_devno_max()) {
653 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
654 "%p obd_magic %08x != %08x\n",
655 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
656 LASSERTF(obd->obd_minor == num,
657 "%p obd_minor %0d != %0d\n",
658 obd, obd->obd_minor, num);
663 EXPORT_SYMBOL(class_num2obd);
666 * Find obd in obd_dev[] by name or uuid.
668 * Increment obd's refcount if found.
670 * \param[in] str obd name or uuid
672 * \retval NULL if not found
673 * \retval target pointer to found obd_device
675 struct obd_device *class_dev_by_str(const char *str)
677 struct obd_device *target = NULL;
678 struct obd_uuid tgtuuid;
681 obd_str2uuid(&tgtuuid, str);
683 read_lock(&obd_dev_lock);
684 rc = class_uuid2dev_nolock(&tgtuuid);
686 rc = class_name2dev_nolock(str);
689 target = class_num2obd(rc);
692 class_incref(target, "find", current);
693 read_unlock(&obd_dev_lock);
697 EXPORT_SYMBOL(class_dev_by_str);
700 * Get obd devices count. Device in any
702 * \retval obd device count
704 int get_devices_count(void)
706 int index, max_index = class_devno_max(), dev_count = 0;
708 read_lock(&obd_dev_lock);
709 for (index = 0; index <= max_index; index++) {
710 struct obd_device *obd = class_num2obd(index);
714 read_unlock(&obd_dev_lock);
718 EXPORT_SYMBOL(get_devices_count);
720 void class_obd_list(void)
725 read_lock(&obd_dev_lock);
726 for (i = 0; i < class_devno_max(); i++) {
727 struct obd_device *obd = class_num2obd(i);
731 if (obd->obd_stopping)
733 else if (obd->obd_set_up)
735 else if (obd->obd_attached)
739 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
740 i, status, obd->obd_type->typ_name,
741 obd->obd_name, obd->obd_uuid.uuid,
742 atomic_read(&obd->obd_refcount));
744 read_unlock(&obd_dev_lock);
747 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
748 * specified, then only the client with that uuid is returned,
749 * otherwise any client connected to the tgt is returned.
751 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
752 const char *type_name,
753 struct obd_uuid *grp_uuid)
757 read_lock(&obd_dev_lock);
758 for (i = 0; i < class_devno_max(); i++) {
759 struct obd_device *obd = class_num2obd(i);
763 if ((strncmp(obd->obd_type->typ_name, type_name,
764 strlen(type_name)) == 0)) {
765 if (obd_uuid_equals(tgt_uuid,
766 &obd->u.cli.cl_target_uuid) &&
767 ((grp_uuid)? obd_uuid_equals(grp_uuid,
768 &obd->obd_uuid) : 1)) {
769 read_unlock(&obd_dev_lock);
774 read_unlock(&obd_dev_lock);
778 EXPORT_SYMBOL(class_find_client_obd);
780 /* Iterate the obd_device list looking devices have grp_uuid. Start
781 * searching at *next, and if a device is found, the next index to look
782 * at is saved in *next. If next is NULL, then the first matching device
783 * will always be returned.
785 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
791 else if (*next >= 0 && *next < class_devno_max())
796 read_lock(&obd_dev_lock);
797 for (; i < class_devno_max(); i++) {
798 struct obd_device *obd = class_num2obd(i);
802 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
805 read_unlock(&obd_dev_lock);
809 read_unlock(&obd_dev_lock);
813 EXPORT_SYMBOL(class_devices_in_group);
816 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
817 * adjust sptlrpc settings accordingly.
819 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
821 struct obd_device *obd;
825 LASSERT(namelen > 0);
827 read_lock(&obd_dev_lock);
828 for (i = 0; i < class_devno_max(); i++) {
829 obd = class_num2obd(i);
831 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
834 /* only notify mdc, osc, osp, lwp, mdt, ost
835 * because only these have a -sptlrpc llog */
836 type = obd->obd_type->typ_name;
837 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
838 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
839 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
840 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
841 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
842 strcmp(type, LUSTRE_OST_NAME) != 0)
845 if (strncmp(obd->obd_name, fsname, namelen))
848 class_incref(obd, __FUNCTION__, obd);
849 read_unlock(&obd_dev_lock);
850 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
851 sizeof(KEY_SPTLRPC_CONF),
852 KEY_SPTLRPC_CONF, 0, NULL, NULL);
854 class_decref(obd, __FUNCTION__, obd);
855 read_lock(&obd_dev_lock);
857 read_unlock(&obd_dev_lock);
860 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
862 void obd_cleanup_caches(void)
865 if (obd_device_cachep) {
866 kmem_cache_destroy(obd_device_cachep);
867 obd_device_cachep = NULL;
873 int obd_init_caches(void)
878 LASSERT(obd_device_cachep == NULL);
879 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
880 sizeof(struct obd_device),
881 0, 0, 0, sizeof(struct obd_device), NULL);
882 if (!obd_device_cachep)
883 GOTO(out, rc = -ENOMEM);
887 obd_cleanup_caches();
891 static const char export_handle_owner[] = "export";
893 /* map connection to client */
894 struct obd_export *class_conn2export(struct lustre_handle *conn)
896 struct obd_export *export;
900 CDEBUG(D_CACHE, "looking for null handle\n");
904 if (conn->cookie == -1) { /* this means assign a new connection */
905 CDEBUG(D_CACHE, "want a new connection\n");
909 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
910 export = class_handle2object(conn->cookie, export_handle_owner);
913 EXPORT_SYMBOL(class_conn2export);
915 struct obd_device *class_exp2obd(struct obd_export *exp)
921 EXPORT_SYMBOL(class_exp2obd);
923 struct obd_import *class_exp2cliimp(struct obd_export *exp)
925 struct obd_device *obd = exp->exp_obd;
928 return obd->u.cli.cl_import;
930 EXPORT_SYMBOL(class_exp2cliimp);
932 /* Export management functions */
933 static void class_export_destroy(struct obd_export *exp)
935 struct obd_device *obd = exp->exp_obd;
938 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
939 LASSERT(obd != NULL);
941 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
942 exp->exp_client_uuid.uuid, obd->obd_name);
944 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
945 ptlrpc_connection_put(exp->exp_connection);
947 LASSERT(list_empty(&exp->exp_outstanding_replies));
948 LASSERT(list_empty(&exp->exp_uncommitted_replies));
949 LASSERT(list_empty(&exp->exp_req_replay_queue));
950 LASSERT(list_empty(&exp->exp_hp_rpcs));
951 obd_destroy_export(exp);
952 /* self export doesn't hold a reference to an obd, although it
953 * exists until freeing of the obd */
954 if (exp != obd->obd_self_export)
955 class_decref(obd, "export", exp);
957 OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
958 kfree_rcu(exp, exp_handle.h_rcu);
962 struct obd_export *class_export_get(struct obd_export *exp)
964 refcount_inc(&exp->exp_handle.h_ref);
965 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
966 refcount_read(&exp->exp_handle.h_ref));
969 EXPORT_SYMBOL(class_export_get);
971 void class_export_put(struct obd_export *exp)
973 LASSERT(exp != NULL);
974 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
975 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
976 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
977 refcount_read(&exp->exp_handle.h_ref) - 1);
979 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
980 struct obd_device *obd = exp->exp_obd;
982 CDEBUG(D_IOCTL, "final put %p/%s\n",
983 exp, exp->exp_client_uuid.uuid);
985 /* release nid stat refererence */
986 lprocfs_exp_cleanup(exp);
988 if (exp == obd->obd_self_export) {
989 /* self export should be destroyed without
990 * zombie thread as it doesn't hold a
991 * reference to obd and doesn't hold any
993 class_export_destroy(exp);
994 /* self export is destroyed, no class
995 * references exist and it is safe to free
999 LASSERT(!list_empty(&exp->exp_obd_chain));
1000 obd_zombie_export_add(exp);
1005 EXPORT_SYMBOL(class_export_put);
1007 static void obd_zombie_exp_cull(struct work_struct *ws)
1009 struct obd_export *export;
1011 export = container_of(ws, struct obd_export, exp_zombie_work);
1012 class_export_destroy(export);
1015 /* Creates a new export, adds it to the hash table, and returns a
1016 * pointer to it. The refcount is 2: one for the hash reference, and
1017 * one for the pointer returned by this function. */
1018 struct obd_export *__class_new_export(struct obd_device *obd,
1019 struct obd_uuid *cluuid, bool is_self)
1021 struct obd_export *export;
1025 OBD_ALLOC_PTR(export);
1027 return ERR_PTR(-ENOMEM);
1029 export->exp_conn_cnt = 0;
1030 export->exp_lock_hash = NULL;
1031 export->exp_flock_hash = NULL;
1032 /* 2 = class_handle_hash + last */
1033 refcount_set(&export->exp_handle.h_ref, 2);
1034 atomic_set(&export->exp_rpc_count, 0);
1035 atomic_set(&export->exp_cb_count, 0);
1036 atomic_set(&export->exp_locks_count, 0);
1037 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1038 INIT_LIST_HEAD(&export->exp_locks_list);
1039 spin_lock_init(&export->exp_locks_list_guard);
1041 atomic_set(&export->exp_replay_count, 0);
1042 export->exp_obd = obd;
1043 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1044 spin_lock_init(&export->exp_uncommitted_replies_lock);
1045 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1046 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1047 INIT_HLIST_NODE(&export->exp_handle.h_link);
1048 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1049 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1050 class_handle_hash(&export->exp_handle, export_handle_owner);
1051 export->exp_last_request_time = ktime_get_real_seconds();
1052 spin_lock_init(&export->exp_lock);
1053 spin_lock_init(&export->exp_rpc_lock);
1054 INIT_HLIST_NODE(&export->exp_gen_hash);
1055 spin_lock_init(&export->exp_bl_list_lock);
1056 INIT_LIST_HEAD(&export->exp_bl_list);
1057 INIT_LIST_HEAD(&export->exp_stale_list);
1058 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1060 export->exp_sp_peer = LUSTRE_SP_ANY;
1061 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1062 export->exp_client_uuid = *cluuid;
1063 obd_init_export(export);
1065 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1067 spin_lock(&obd->obd_dev_lock);
1068 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1069 /* shouldn't happen, but might race */
1070 if (obd->obd_stopping)
1071 GOTO(exit_unlock, rc = -ENODEV);
1073 rc = obd_uuid_add(obd, export);
1075 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1076 obd->obd_name, cluuid->uuid, rc);
1077 GOTO(exit_unlock, rc = -EALREADY);
1082 class_incref(obd, "export", export);
1083 list_add_tail(&export->exp_obd_chain_timed,
1084 &obd->obd_exports_timed);
1085 list_add(&export->exp_obd_chain, &obd->obd_exports);
1086 obd->obd_num_exports++;
1088 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1089 INIT_LIST_HEAD(&export->exp_obd_chain);
1091 spin_unlock(&obd->obd_dev_lock);
1095 spin_unlock(&obd->obd_dev_lock);
1096 class_handle_unhash(&export->exp_handle);
1097 obd_destroy_export(export);
1098 OBD_FREE_PTR(export);
1102 struct obd_export *class_new_export(struct obd_device *obd,
1103 struct obd_uuid *uuid)
1105 return __class_new_export(obd, uuid, false);
1107 EXPORT_SYMBOL(class_new_export);
1109 struct obd_export *class_new_export_self(struct obd_device *obd,
1110 struct obd_uuid *uuid)
1112 return __class_new_export(obd, uuid, true);
1115 void class_unlink_export(struct obd_export *exp)
1117 class_handle_unhash(&exp->exp_handle);
1119 if (exp->exp_obd->obd_self_export == exp) {
1120 class_export_put(exp);
1124 spin_lock(&exp->exp_obd->obd_dev_lock);
1125 /* delete an uuid-export hashitem from hashtables */
1126 if (exp != exp->exp_obd->obd_self_export)
1127 obd_uuid_del(exp->exp_obd, exp);
1129 #ifdef HAVE_SERVER_SUPPORT
1130 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1131 struct tg_export_data *ted = &exp->exp_target_data;
1132 struct cfs_hash *hash;
1134 /* Because obd_gen_hash will not be released until
1135 * class_cleanup(), so hash should never be NULL here */
1136 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1137 LASSERT(hash != NULL);
1138 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1139 &exp->exp_gen_hash);
1140 cfs_hash_putref(hash);
1142 #endif /* HAVE_SERVER_SUPPORT */
1144 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1145 list_del_init(&exp->exp_obd_chain_timed);
1146 exp->exp_obd->obd_num_exports--;
1147 spin_unlock(&exp->exp_obd->obd_dev_lock);
1148 atomic_inc(&obd_stale_export_num);
1150 /* A reference is kept by obd_stale_exports list */
1151 obd_stale_export_put(exp);
1153 EXPORT_SYMBOL(class_unlink_export);
1155 /* Import management functions */
1156 static void obd_zombie_import_free(struct obd_import *imp)
1158 struct obd_import_conn *imp_conn;
1161 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1162 imp->imp_obd->obd_name);
1164 LASSERT(refcount_read(&imp->imp_refcount) == 0);
1166 ptlrpc_connection_put(imp->imp_connection);
1168 while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1169 struct obd_import_conn,
1170 oic_item)) != NULL) {
1171 list_del_init(&imp_conn->oic_item);
1172 ptlrpc_connection_put(imp_conn->oic_conn);
1173 OBD_FREE(imp_conn, sizeof(*imp_conn));
1176 LASSERT(imp->imp_sec == NULL);
1177 LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1178 imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1179 class_decref(imp->imp_obd, "import", imp);
1184 struct obd_import *class_import_get(struct obd_import *import)
1186 refcount_inc(&import->imp_refcount);
1187 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1188 refcount_read(&import->imp_refcount),
1189 import->imp_obd->obd_name);
1192 EXPORT_SYMBOL(class_import_get);
1194 void class_import_put(struct obd_import *imp)
1198 LASSERT(refcount_read(&imp->imp_refcount) > 0);
1200 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1201 refcount_read(&imp->imp_refcount) - 1,
1202 imp->imp_obd->obd_name);
1204 if (refcount_dec_and_test(&imp->imp_refcount)) {
1205 CDEBUG(D_INFO, "final put import %p\n", imp);
1206 obd_zombie_import_add(imp);
1211 EXPORT_SYMBOL(class_import_put);
1213 static void init_imp_at(struct imp_at *at) {
1215 at_init(&at->iat_net_latency, 0, 0);
1216 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1217 /* max service estimates are tracked on the server side, so
1218 don't use the AT history here, just use the last reported
1219 val. (But keep hist for proc histogram, worst_ever) */
1220 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1225 static void obd_zombie_imp_cull(struct work_struct *ws)
1227 struct obd_import *import;
1229 import = container_of(ws, struct obd_import, imp_zombie_work);
1230 obd_zombie_import_free(import);
1233 struct obd_import *class_new_import(struct obd_device *obd)
1235 struct obd_import *imp;
1236 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1238 OBD_ALLOC(imp, sizeof(*imp));
1242 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1243 INIT_LIST_HEAD(&imp->imp_replay_list);
1244 INIT_LIST_HEAD(&imp->imp_sending_list);
1245 INIT_LIST_HEAD(&imp->imp_delayed_list);
1246 INIT_LIST_HEAD(&imp->imp_committed_list);
1247 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1248 imp->imp_known_replied_xid = 0;
1249 imp->imp_replay_cursor = &imp->imp_committed_list;
1250 spin_lock_init(&imp->imp_lock);
1251 imp->imp_last_success_conn = 0;
1252 imp->imp_state = LUSTRE_IMP_NEW;
1253 imp->imp_obd = class_incref(obd, "import", imp);
1254 rwlock_init(&imp->imp_sec_lock);
1255 init_waitqueue_head(&imp->imp_recovery_waitq);
1256 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1258 if (curr_pid_ns && curr_pid_ns->child_reaper)
1259 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1261 imp->imp_sec_refpid = 1;
1263 refcount_set(&imp->imp_refcount, 2);
1264 atomic_set(&imp->imp_unregistering, 0);
1265 atomic_set(&imp->imp_reqs, 0);
1266 atomic_set(&imp->imp_inflight, 0);
1267 atomic_set(&imp->imp_replay_inflight, 0);
1268 init_waitqueue_head(&imp->imp_replay_waitq);
1269 atomic_set(&imp->imp_inval_count, 0);
1270 INIT_LIST_HEAD(&imp->imp_conn_list);
1271 init_imp_at(&imp->imp_at);
1273 /* the default magic is V2, will be used in connect RPC, and
1274 * then adjusted according to the flags in request/reply. */
1275 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1279 EXPORT_SYMBOL(class_new_import);
1281 void class_destroy_import(struct obd_import *import)
1283 LASSERT(import != NULL);
1284 LASSERT(import != LP_POISON);
1286 spin_lock(&import->imp_lock);
1287 import->imp_generation++;
1288 spin_unlock(&import->imp_lock);
1289 class_import_put(import);
1291 EXPORT_SYMBOL(class_destroy_import);
1293 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1295 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1297 spin_lock(&exp->exp_locks_list_guard);
1299 LASSERT(lock->l_exp_refs_nr >= 0);
1301 if (lock->l_exp_refs_target != NULL &&
1302 lock->l_exp_refs_target != exp) {
1303 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1304 exp, lock, lock->l_exp_refs_target);
1306 if ((lock->l_exp_refs_nr ++) == 0) {
1307 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1308 lock->l_exp_refs_target = exp;
1310 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1311 lock, exp, lock->l_exp_refs_nr);
1312 spin_unlock(&exp->exp_locks_list_guard);
1314 EXPORT_SYMBOL(__class_export_add_lock_ref);
1316 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1318 spin_lock(&exp->exp_locks_list_guard);
1319 LASSERT(lock->l_exp_refs_nr > 0);
1320 if (lock->l_exp_refs_target != exp) {
1321 LCONSOLE_WARN("lock %p, "
1322 "mismatching export pointers: %p, %p\n",
1323 lock, lock->l_exp_refs_target, exp);
1325 if (-- lock->l_exp_refs_nr == 0) {
1326 list_del_init(&lock->l_exp_refs_link);
1327 lock->l_exp_refs_target = NULL;
1329 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1330 lock, exp, lock->l_exp_refs_nr);
1331 spin_unlock(&exp->exp_locks_list_guard);
1333 EXPORT_SYMBOL(__class_export_del_lock_ref);
1336 /* A connection defines an export context in which preallocation can
1337 be managed. This releases the export pointer reference, and returns
1338 the export handle, so the export refcount is 1 when this function
1340 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1341 struct obd_uuid *cluuid)
1343 struct obd_export *export;
1344 LASSERT(conn != NULL);
1345 LASSERT(obd != NULL);
1346 LASSERT(cluuid != NULL);
1349 export = class_new_export(obd, cluuid);
1351 RETURN(PTR_ERR(export));
1353 conn->cookie = export->exp_handle.h_cookie;
1354 class_export_put(export);
1356 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1357 cluuid->uuid, conn->cookie);
1360 EXPORT_SYMBOL(class_connect);
1362 /* if export is involved in recovery then clean up related things */
1363 static void class_export_recovery_cleanup(struct obd_export *exp)
1365 struct obd_device *obd = exp->exp_obd;
1367 spin_lock(&obd->obd_recovery_task_lock);
1368 if (obd->obd_recovering) {
1369 if (exp->exp_in_recovery) {
1370 spin_lock(&exp->exp_lock);
1371 exp->exp_in_recovery = 0;
1372 spin_unlock(&exp->exp_lock);
1373 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1374 atomic_dec(&obd->obd_connected_clients);
1377 /* if called during recovery then should update
1378 * obd_stale_clients counter,
1379 * lightweight exports are not counted */
1380 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1381 exp->exp_obd->obd_stale_clients++;
1383 spin_unlock(&obd->obd_recovery_task_lock);
1385 spin_lock(&exp->exp_lock);
1386 /** Cleanup req replay fields */
1387 if (exp->exp_req_replay_needed) {
1388 exp->exp_req_replay_needed = 0;
1390 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1391 atomic_dec(&obd->obd_req_replay_clients);
1394 /** Cleanup lock replay data */
1395 if (exp->exp_lock_replay_needed) {
1396 exp->exp_lock_replay_needed = 0;
1398 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1399 atomic_dec(&obd->obd_lock_replay_clients);
1401 spin_unlock(&exp->exp_lock);
1404 /* This function removes 1-3 references from the export:
1405 * 1 - for export pointer passed
1406 * and if disconnect really need
1407 * 2 - removing from hash
1408 * 3 - in client_unlink_export
1409 * The export pointer passed to this function can destroyed */
1410 int class_disconnect(struct obd_export *export)
1412 int already_disconnected;
1415 if (export == NULL) {
1416 CWARN("attempting to free NULL export %p\n", export);
1420 spin_lock(&export->exp_lock);
1421 already_disconnected = export->exp_disconnected;
1422 export->exp_disconnected = 1;
1423 #ifdef HAVE_SERVER_SUPPORT
1424 /* We hold references of export for uuid hash
1425 * and nid_hash and export link at least. So
1426 * it is safe to call rh*table_remove_fast in
1429 obd_nid_del(export->exp_obd, export);
1430 #endif /* HAVE_SERVER_SUPPORT */
1431 spin_unlock(&export->exp_lock);
1433 /* class_cleanup(), abort_recovery(), and class_fail_export()
1434 * all end up in here, and if any of them race we shouldn't
1435 * call extra class_export_puts(). */
1436 if (already_disconnected)
1437 GOTO(no_disconn, already_disconnected);
1439 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1440 export->exp_handle.h_cookie);
1442 class_export_recovery_cleanup(export);
1443 class_unlink_export(export);
1445 class_export_put(export);
1448 EXPORT_SYMBOL(class_disconnect);
1450 /* Return non-zero for a fully connected export */
1451 int class_connected_export(struct obd_export *exp)
1456 spin_lock(&exp->exp_lock);
1457 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1458 spin_unlock(&exp->exp_lock);
1462 EXPORT_SYMBOL(class_connected_export);
1464 static void class_disconnect_export_list(struct list_head *list,
1465 enum obd_option flags)
1468 struct obd_export *exp;
1471 /* It's possible that an export may disconnect itself, but
1472 * nothing else will be added to this list.
1474 while ((exp = list_first_entry_or_null(list, struct obd_export,
1475 exp_obd_chain)) != NULL) {
1476 /* need for safe call CDEBUG after obd_disconnect */
1477 class_export_get(exp);
1479 spin_lock(&exp->exp_lock);
1480 exp->exp_flags = flags;
1481 spin_unlock(&exp->exp_lock);
1483 if (obd_uuid_equals(&exp->exp_client_uuid,
1484 &exp->exp_obd->obd_uuid)) {
1486 "exp %p export uuid == obd uuid, don't discon\n",
1488 /* Need to delete this now so we don't end up pointing
1489 * to work_list later when this export is cleaned up. */
1490 list_del_init(&exp->exp_obd_chain);
1491 class_export_put(exp);
1495 class_export_get(exp);
1496 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1497 "last request at %lld\n",
1498 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1499 exp, exp->exp_last_request_time);
1500 /* release one export reference anyway */
1501 rc = obd_disconnect(exp);
1503 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1504 obd_export_nid2str(exp), exp, rc);
1505 class_export_put(exp);
1510 void class_disconnect_exports(struct obd_device *obd)
1512 LIST_HEAD(work_list);
1515 /* Move all of the exports from obd_exports to a work list, en masse. */
1516 spin_lock(&obd->obd_dev_lock);
1517 list_splice_init(&obd->obd_exports, &work_list);
1518 list_splice_init(&obd->obd_delayed_exports, &work_list);
1519 spin_unlock(&obd->obd_dev_lock);
1521 if (!list_empty(&work_list)) {
1522 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1523 "disconnecting them\n", obd->obd_minor, obd);
1524 class_disconnect_export_list(&work_list,
1525 exp_flags_from_obd(obd));
1527 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1528 obd->obd_minor, obd);
1531 EXPORT_SYMBOL(class_disconnect_exports);
1533 /* Remove exports that have not completed recovery.
1535 void class_disconnect_stale_exports(struct obd_device *obd,
1536 int (*test_export)(struct obd_export *))
1538 LIST_HEAD(work_list);
1539 struct obd_export *exp, *n;
1543 spin_lock(&obd->obd_dev_lock);
1544 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1546 /* don't count self-export as client */
1547 if (obd_uuid_equals(&exp->exp_client_uuid,
1548 &exp->exp_obd->obd_uuid))
1551 /* don't evict clients which have no slot in last_rcvd
1552 * (e.g. lightweight connection) */
1553 if (exp->exp_target_data.ted_lr_idx == -1)
1556 spin_lock(&exp->exp_lock);
1557 if (exp->exp_failed || test_export(exp)) {
1558 spin_unlock(&exp->exp_lock);
1561 exp->exp_failed = 1;
1562 spin_unlock(&exp->exp_lock);
1564 list_move(&exp->exp_obd_chain, &work_list);
1566 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1567 obd->obd_name, exp->exp_client_uuid.uuid,
1568 obd_export_nid2str(exp));
1569 print_export_data(exp, "EVICTING", 0, D_HA);
1571 spin_unlock(&obd->obd_dev_lock);
1574 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1575 obd->obd_name, evicted);
1577 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1578 OBD_OPT_ABORT_RECOV);
1581 EXPORT_SYMBOL(class_disconnect_stale_exports);
1583 void class_fail_export(struct obd_export *exp)
1585 int rc, already_failed;
1587 spin_lock(&exp->exp_lock);
1588 already_failed = exp->exp_failed;
1589 exp->exp_failed = 1;
1590 spin_unlock(&exp->exp_lock);
1592 if (already_failed) {
1593 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1594 exp, exp->exp_client_uuid.uuid);
1598 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1599 exp, exp->exp_client_uuid.uuid);
1601 if (obd_dump_on_timeout)
1602 libcfs_debug_dumplog();
1604 /* need for safe call CDEBUG after obd_disconnect */
1605 class_export_get(exp);
1607 /* Most callers into obd_disconnect are removing their own reference
1608 * (request, for example) in addition to the one from the hash table.
1609 * We don't have such a reference here, so make one. */
1610 class_export_get(exp);
1611 rc = obd_disconnect(exp);
1613 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1615 CDEBUG(D_HA, "disconnected export %p/%s\n",
1616 exp, exp->exp_client_uuid.uuid);
1617 class_export_put(exp);
1619 EXPORT_SYMBOL(class_fail_export);
1621 #ifdef HAVE_SERVER_SUPPORT
1623 static int take_first(struct obd_export *exp, void *data)
1625 struct obd_export **expp = data;
1628 /* already have one */
1630 if (exp->exp_failed)
1631 /* Don't want this one */
1633 if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1634 /* Cannot get a ref on this one */
1640 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1642 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1643 struct obd_export *doomed_exp;
1644 int exports_evicted = 0;
1646 spin_lock(&obd->obd_dev_lock);
1647 /* umount has run already, so evict thread should leave
1648 * its task to umount thread now */
1649 if (obd->obd_stopping) {
1650 spin_unlock(&obd->obd_dev_lock);
1651 return exports_evicted;
1653 spin_unlock(&obd->obd_dev_lock);
1656 while (obd_nid_export_for_each(obd, nid_key,
1657 take_first, &doomed_exp) > 0) {
1659 LASSERTF(doomed_exp != obd->obd_self_export,
1660 "self-export is hashed by NID?\n");
1662 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1664 obd_uuid2str(&doomed_exp->exp_client_uuid),
1665 obd_export_nid2str(doomed_exp));
1667 class_fail_export(doomed_exp);
1668 class_export_put(doomed_exp);
1673 if (!exports_evicted)
1675 "%s: can't disconnect NID '%s': no exports found\n",
1676 obd->obd_name, nid);
1677 return exports_evicted;
1679 EXPORT_SYMBOL(obd_export_evict_by_nid);
1681 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1683 struct obd_export *doomed_exp = NULL;
1684 struct obd_uuid doomed_uuid;
1685 int exports_evicted = 0;
1687 spin_lock(&obd->obd_dev_lock);
1688 if (obd->obd_stopping) {
1689 spin_unlock(&obd->obd_dev_lock);
1690 return exports_evicted;
1692 spin_unlock(&obd->obd_dev_lock);
1694 obd_str2uuid(&doomed_uuid, uuid);
1695 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1696 CERROR("%s: can't evict myself\n", obd->obd_name);
1697 return exports_evicted;
1700 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1701 if (doomed_exp == NULL) {
1702 CERROR("%s: can't disconnect %s: no exports found\n",
1703 obd->obd_name, uuid);
1705 CWARN("%s: evicting %s at adminstrative request\n",
1706 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1707 class_fail_export(doomed_exp);
1708 class_export_put(doomed_exp);
1709 obd_uuid_del(obd, doomed_exp);
1713 return exports_evicted;
1715 #endif /* HAVE_SERVER_SUPPORT */
1717 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1718 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1719 EXPORT_SYMBOL(class_export_dump_hook);
1722 static void print_export_data(struct obd_export *exp, const char *status,
1723 int locks, int debug_level)
1725 struct ptlrpc_reply_state *rs;
1726 struct ptlrpc_reply_state *first_reply = NULL;
1729 spin_lock(&exp->exp_lock);
1730 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1736 spin_unlock(&exp->exp_lock);
1738 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1739 "%p %s %llu stale:%d\n",
1740 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1741 obd_export_nid2str(exp),
1742 refcount_read(&exp->exp_handle.h_ref),
1743 atomic_read(&exp->exp_rpc_count),
1744 atomic_read(&exp->exp_cb_count),
1745 atomic_read(&exp->exp_locks_count),
1746 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1747 nreplies, first_reply, nreplies > 3 ? "..." : "",
1748 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1749 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1750 if (locks && class_export_dump_hook != NULL)
1751 class_export_dump_hook(exp);
1755 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1757 struct obd_export *exp;
1759 spin_lock(&obd->obd_dev_lock);
1760 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1761 print_export_data(exp, "ACTIVE", locks, debug_level);
1762 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1763 print_export_data(exp, "UNLINKED", locks, debug_level);
1764 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1765 print_export_data(exp, "DELAYED", locks, debug_level);
1766 spin_unlock(&obd->obd_dev_lock);
1769 void obd_exports_barrier(struct obd_device *obd)
1772 LASSERT(list_empty(&obd->obd_exports));
1773 spin_lock(&obd->obd_dev_lock);
1774 while (!list_empty(&obd->obd_unlinked_exports)) {
1775 spin_unlock(&obd->obd_dev_lock);
1776 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1777 if (waited > 5 && is_power_of_2(waited)) {
1778 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1779 "more than %d seconds. "
1780 "The obd refcount = %d. Is it stuck?\n",
1781 obd->obd_name, waited,
1782 atomic_read(&obd->obd_refcount));
1783 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1786 spin_lock(&obd->obd_dev_lock);
1788 spin_unlock(&obd->obd_dev_lock);
1790 EXPORT_SYMBOL(obd_exports_barrier);
1793 * Add export to the obd_zombe thread and notify it.
1795 static void obd_zombie_export_add(struct obd_export *exp) {
1796 atomic_dec(&obd_stale_export_num);
1797 spin_lock(&exp->exp_obd->obd_dev_lock);
1798 LASSERT(!list_empty(&exp->exp_obd_chain));
1799 list_del_init(&exp->exp_obd_chain);
1800 spin_unlock(&exp->exp_obd->obd_dev_lock);
1802 queue_work(zombie_wq, &exp->exp_zombie_work);
1806 * Add import to the obd_zombe thread and notify it.
1808 static void obd_zombie_import_add(struct obd_import *imp) {
1809 LASSERT(imp->imp_sec == NULL);
1811 queue_work(zombie_wq, &imp->imp_zombie_work);
1815 * wait when obd_zombie import/export queues become empty
1817 void obd_zombie_barrier(void)
1819 flush_workqueue(zombie_wq);
1821 EXPORT_SYMBOL(obd_zombie_barrier);
1824 struct obd_export *obd_stale_export_get(void)
1826 struct obd_export *exp = NULL;
1829 spin_lock(&obd_stale_export_lock);
1830 if (!list_empty(&obd_stale_exports)) {
1831 exp = list_first_entry(&obd_stale_exports,
1832 struct obd_export, exp_stale_list);
1833 list_del_init(&exp->exp_stale_list);
1835 spin_unlock(&obd_stale_export_lock);
1838 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1839 atomic_read(&obd_stale_export_num));
1843 EXPORT_SYMBOL(obd_stale_export_get);
1845 void obd_stale_export_put(struct obd_export *exp)
1849 LASSERT(list_empty(&exp->exp_stale_list));
1850 if (exp->exp_lock_hash &&
1851 atomic_read(&exp->exp_lock_hash->hs_count)) {
1852 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1853 atomic_read(&obd_stale_export_num));
1855 spin_lock_bh(&exp->exp_bl_list_lock);
1856 spin_lock(&obd_stale_export_lock);
1857 /* Add to the tail if there is no blocked locks,
1858 * to the head otherwise. */
1859 if (list_empty(&exp->exp_bl_list))
1860 list_add_tail(&exp->exp_stale_list,
1861 &obd_stale_exports);
1863 list_add(&exp->exp_stale_list,
1864 &obd_stale_exports);
1866 spin_unlock(&obd_stale_export_lock);
1867 spin_unlock_bh(&exp->exp_bl_list_lock);
1869 class_export_put(exp);
1873 EXPORT_SYMBOL(obd_stale_export_put);
1876 * Adjust the position of the export in the stale list,
1877 * i.e. move to the head of the list if is needed.
1879 void obd_stale_export_adjust(struct obd_export *exp)
1881 LASSERT(exp != NULL);
1882 spin_lock_bh(&exp->exp_bl_list_lock);
1883 spin_lock(&obd_stale_export_lock);
1885 if (!list_empty(&exp->exp_stale_list) &&
1886 !list_empty(&exp->exp_bl_list))
1887 list_move(&exp->exp_stale_list, &obd_stale_exports);
1889 spin_unlock(&obd_stale_export_lock);
1890 spin_unlock_bh(&exp->exp_bl_list_lock);
1892 EXPORT_SYMBOL(obd_stale_export_adjust);
1895 * start destroy zombie import/export thread
1897 int obd_zombie_impexp_init(void)
1899 zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1901 cfs_cpt_number(cfs_cpt_tab));
1903 return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1907 * stop destroy zombie import/export thread
1909 void obd_zombie_impexp_stop(void)
1911 destroy_workqueue(zombie_wq);
1912 LASSERT(list_empty(&obd_stale_exports));
1915 /***** Kernel-userspace comm helpers *******/
1917 /* Get length of entire message, including header */
1918 int kuc_len(int payload_len)
1920 return sizeof(struct kuc_hdr) + payload_len;
1922 EXPORT_SYMBOL(kuc_len);
1924 /* Get a pointer to kuc header, given a ptr to the payload
1925 * @param p Pointer to payload area
1926 * @returns Pointer to kuc header
1928 struct kuc_hdr * kuc_ptr(void *p)
1930 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1931 LASSERT(lh->kuc_magic == KUC_MAGIC);
1934 EXPORT_SYMBOL(kuc_ptr);
1936 /* Alloc space for a message, and fill in header
1937 * @return Pointer to payload area
1939 void *kuc_alloc(int payload_len, int transport, int type)
1942 int len = kuc_len(payload_len);
1946 return ERR_PTR(-ENOMEM);
1948 lh->kuc_magic = KUC_MAGIC;
1949 lh->kuc_transport = transport;
1950 lh->kuc_msgtype = type;
1951 lh->kuc_msglen = len;
1953 return (void *)(lh + 1);
1955 EXPORT_SYMBOL(kuc_alloc);
1957 /* Takes pointer to payload area */
1958 void kuc_free(void *p, int payload_len)
1960 struct kuc_hdr *lh = kuc_ptr(p);
1961 OBD_FREE(lh, kuc_len(payload_len));
1963 EXPORT_SYMBOL(kuc_free);
1965 struct obd_request_slot_waiter {
1966 struct list_head orsw_entry;
1967 wait_queue_head_t orsw_waitq;
1971 static bool obd_request_slot_avail(struct client_obd *cli,
1972 struct obd_request_slot_waiter *orsw)
1976 spin_lock(&cli->cl_loi_list_lock);
1977 avail = !!list_empty(&orsw->orsw_entry);
1978 spin_unlock(&cli->cl_loi_list_lock);
1984 * For network flow control, the RPC sponsor needs to acquire a credit
1985 * before sending the RPC. The credits count for a connection is defined
1986 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1987 * the subsequent RPC sponsors need to wait until others released their
1988 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1990 int obd_get_request_slot(struct client_obd *cli)
1992 struct obd_request_slot_waiter orsw;
1995 spin_lock(&cli->cl_loi_list_lock);
1996 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1997 cli->cl_rpcs_in_flight++;
1998 spin_unlock(&cli->cl_loi_list_lock);
2002 init_waitqueue_head(&orsw.orsw_waitq);
2003 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2004 orsw.orsw_signaled = false;
2005 spin_unlock(&cli->cl_loi_list_lock);
2007 rc = l_wait_event_abortable(orsw.orsw_waitq,
2008 obd_request_slot_avail(cli, &orsw) ||
2009 orsw.orsw_signaled);
2011 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2012 * freed but other (such as obd_put_request_slot) is using it. */
2013 spin_lock(&cli->cl_loi_list_lock);
2015 if (!orsw.orsw_signaled) {
2016 if (list_empty(&orsw.orsw_entry))
2017 cli->cl_rpcs_in_flight--;
2019 list_del(&orsw.orsw_entry);
2024 if (orsw.orsw_signaled) {
2025 LASSERT(list_empty(&orsw.orsw_entry));
2029 spin_unlock(&cli->cl_loi_list_lock);
2033 EXPORT_SYMBOL(obd_get_request_slot);
2035 void obd_put_request_slot(struct client_obd *cli)
2037 struct obd_request_slot_waiter *orsw;
2039 spin_lock(&cli->cl_loi_list_lock);
2040 cli->cl_rpcs_in_flight--;
2042 /* If there is free slot, wakeup the first waiter. */
2043 if (!list_empty(&cli->cl_flight_waiters) &&
2044 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2045 orsw = list_first_entry(&cli->cl_flight_waiters,
2046 struct obd_request_slot_waiter,
2048 list_del_init(&orsw->orsw_entry);
2049 cli->cl_rpcs_in_flight++;
2050 wake_up(&orsw->orsw_waitq);
2052 spin_unlock(&cli->cl_loi_list_lock);
2054 EXPORT_SYMBOL(obd_put_request_slot);
2056 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2058 return cli->cl_max_rpcs_in_flight;
2060 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2062 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2064 struct obd_request_slot_waiter *orsw;
2070 if (max > OBD_MAX_RIF_MAX || max < 1)
2073 CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2074 cli->cl_import->imp_obd->obd_name, max,
2075 cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2077 if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2078 LUSTRE_MDC_NAME) == 0) {
2079 /* adjust max_mod_rpcs_in_flight to ensure it is always
2080 * strictly lower that max_rpcs_in_flight */
2082 CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2083 cli->cl_import->imp_obd->obd_name);
2086 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2087 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2093 spin_lock(&cli->cl_loi_list_lock);
2094 old = cli->cl_max_rpcs_in_flight;
2095 cli->cl_max_rpcs_in_flight = max;
2096 client_adjust_max_dirty(cli);
2100 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2101 for (i = 0; i < diff; i++) {
2102 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2103 struct obd_request_slot_waiter,
2108 list_del_init(&orsw->orsw_entry);
2109 cli->cl_rpcs_in_flight++;
2110 wake_up(&orsw->orsw_waitq);
2112 spin_unlock(&cli->cl_loi_list_lock);
2116 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2118 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2120 return cli->cl_max_mod_rpcs_in_flight;
2122 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2124 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2126 struct obd_connect_data *ocd;
2130 if (max > OBD_MAX_RIF_MAX || max < 1)
2133 ocd = &cli->cl_import->imp_connect_data;
2134 CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2135 cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2136 ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2138 if (max == OBD_MAX_RIF_MAX)
2139 max = OBD_MAX_RIF_MAX - 1;
2141 /* Cannot exceed or equal max_rpcs_in_flight. If we are asked to
2142 * increase this value, also bump up max_rpcs_in_flight to match.
2144 if (max >= cli->cl_max_rpcs_in_flight) {
2146 "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2147 cli->cl_import->imp_obd->obd_name, max + 1, max);
2148 obd_set_max_rpcs_in_flight(cli, max + 1);
2151 /* cannot exceed max modify RPCs in flight supported by the server,
2152 * but verify ocd_connect_flags is at least initialized first. If
2153 * not, allow it and fix value later in ptlrpc_connect_set_flags().
2155 if (!ocd->ocd_connect_flags) {
2156 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2157 } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2158 maxmodrpcs = ocd->ocd_maxmodrpcs;
2159 if (maxmodrpcs == 0) { /* connection not finished yet */
2160 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2162 "%s: partial connect, assume maxmodrpcs=%hu\n",
2163 cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2168 if (max > maxmodrpcs) {
2169 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than ocd_maxmodrpcs=%hu returned by the server at connection\n",
2170 cli->cl_import->imp_obd->obd_name,
2175 spin_lock(&cli->cl_mod_rpcs_lock);
2177 prev = cli->cl_max_mod_rpcs_in_flight;
2178 cli->cl_max_mod_rpcs_in_flight = max;
2180 /* wakeup waiters if limit has been increased */
2181 if (cli->cl_max_mod_rpcs_in_flight > prev)
2182 wake_up(&cli->cl_mod_rpcs_waitq);
2184 spin_unlock(&cli->cl_mod_rpcs_lock);
2188 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2190 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2191 struct seq_file *seq)
2193 unsigned long mod_tot = 0, mod_cum;
2194 struct timespec64 now;
2197 ktime_get_real_ts64(&now);
2199 spin_lock(&cli->cl_mod_rpcs_lock);
2201 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2202 (s64)now.tv_sec, now.tv_nsec);
2203 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2204 cli->cl_mod_rpcs_in_flight);
2206 seq_printf(seq, "\n\t\t\tmodify\n");
2207 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2209 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2212 for (i = 0; i < OBD_HIST_MAX; i++) {
2213 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2215 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2216 i, mod, pct(mod, mod_tot),
2217 pct(mod_cum, mod_tot));
2218 if (mod_cum == mod_tot)
2222 spin_unlock(&cli->cl_mod_rpcs_lock);
2226 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2228 /* The number of modify RPCs sent in parallel is limited
2229 * because the server has a finite number of slots per client to
2230 * store request result and ensure reply reconstruction when needed.
2231 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2232 * that takes into account server limit and cl_max_rpcs_in_flight
2234 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2235 * one close request is allowed above the maximum.
2237 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2242 /* A slot is available if
2243 * - number of modify RPCs in flight is less than the max
2244 * - it's a close RPC and no other close request is in flight
2246 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2247 (close_req && cli->cl_close_rpcs_in_flight == 0);
2252 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2257 spin_lock(&cli->cl_mod_rpcs_lock);
2258 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2259 spin_unlock(&cli->cl_mod_rpcs_lock);
2264 /* Get a modify RPC slot from the obd client @cli according
2265 * to the kind of operation @opc that is going to be sent
2266 * and the intent @it of the operation if it applies.
2267 * If the maximum number of modify RPCs in flight is reached
2268 * the thread is put to sleep.
2269 * Returns the tag to be set in the request message. Tag 0
2270 * is reserved for non-modifying requests.
2272 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2274 bool close_req = false;
2277 if (opc == MDS_CLOSE)
2281 spin_lock(&cli->cl_mod_rpcs_lock);
2282 max = cli->cl_max_mod_rpcs_in_flight;
2283 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2284 /* there is a slot available */
2285 cli->cl_mod_rpcs_in_flight++;
2287 cli->cl_close_rpcs_in_flight++;
2288 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2289 cli->cl_mod_rpcs_in_flight);
2290 /* find a free tag */
2291 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2293 LASSERT(i < OBD_MAX_RIF_MAX);
2294 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2295 spin_unlock(&cli->cl_mod_rpcs_lock);
2296 /* tag 0 is reserved for non-modify RPCs */
2299 "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2300 cli->cl_import->imp_obd->obd_name,
2305 spin_unlock(&cli->cl_mod_rpcs_lock);
2307 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2308 "opc %u, max %hu\n",
2309 cli->cl_import->imp_obd->obd_name, opc, max);
2311 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2312 obd_mod_rpc_slot_avail(cli,
2316 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2318 /* Put a modify RPC slot from the obd client @cli according
2319 * to the kind of operation @opc that has been sent.
2321 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2323 bool close_req = false;
2328 if (opc == MDS_CLOSE)
2331 spin_lock(&cli->cl_mod_rpcs_lock);
2332 cli->cl_mod_rpcs_in_flight--;
2334 cli->cl_close_rpcs_in_flight--;
2335 /* release the tag in the bitmap */
2336 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2337 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2338 spin_unlock(&cli->cl_mod_rpcs_lock);
2339 /* LU-14741 - to prevent close RPCs stuck behind normal ones */
2341 wake_up_all(&cli->cl_mod_rpcs_waitq);
2343 wake_up(&cli->cl_mod_rpcs_waitq);
2345 EXPORT_SYMBOL(obd_put_mod_rpc_slot);