4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/obdclass/genops.c
33 * These are the only exported functions, they provide some generic
34 * infrastructure for managing object devices
37 #define DEBUG_SUBSYSTEM S_CLASS
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
48 DEFINE_RWLOCK(obd_dev_lock);
49 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51 static struct kmem_cache *obd_device_cachep;
52 static struct kobj_type class_ktype;
53 static struct workqueue_struct *zombie_wq;
55 static void obd_zombie_export_add(struct obd_export *exp);
56 static void obd_zombie_import_add(struct obd_import *imp);
57 static void print_export_data(struct obd_export *exp,
58 const char *status, int locks, int debug_level);
60 static LIST_HEAD(obd_stale_exports);
61 static DEFINE_SPINLOCK(obd_stale_export_lock);
62 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 * support functions: we could use inter-module communication, but this
66 * is more portable to other OS's
68 static struct obd_device *obd_device_alloc(void)
70 struct obd_device *obd;
72 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74 obd->obd_magic = OBD_DEVICE_MAGIC;
79 static void obd_device_free(struct obd_device *obd)
82 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84 if (obd->obd_namespace != NULL) {
85 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86 obd, obd->obd_namespace, obd->obd_force);
89 lu_ref_fini(&obd->obd_reference);
90 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 struct obd_type *class_search_type(const char *name)
95 struct kobject *kobj = kset_find_obj(lustre_kset, name);
97 if (kobj && kobj->ktype == &class_ktype)
98 return container_of(kobj, struct obd_type, typ_kobj);
103 EXPORT_SYMBOL(class_search_type);
105 struct obd_type *class_get_type(const char *name)
107 struct obd_type *type;
109 type = class_search_type(name);
110 #ifdef HAVE_MODULE_LOADING_SUPPORT
112 const char *modname = name;
114 #ifdef HAVE_SERVER_SUPPORT
115 if (strcmp(modname, "obdfilter") == 0)
118 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119 modname = LUSTRE_OSP_NAME;
121 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122 modname = LUSTRE_MDT_NAME;
123 #endif /* HAVE_SERVER_SUPPORT */
125 if (!request_module("%s", modname)) {
126 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127 type = class_search_type(name);
129 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135 if (try_module_get(type->typ_dt_ops->o_owner)) {
136 atomic_inc(&type->typ_refcnt);
137 /* class_search_type() returned a counted reference,
138 * but we don't need that count any more as
139 * we have one through typ_refcnt.
141 kobject_put(&type->typ_kobj);
143 kobject_put(&type->typ_kobj);
150 void class_put_type(struct obd_type *type)
153 module_put(type->typ_dt_ops->o_owner);
154 atomic_dec(&type->typ_refcnt);
157 static void class_sysfs_release(struct kobject *kobj)
159 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
161 debugfs_remove_recursive(type->typ_debugfs_entry);
162 type->typ_debugfs_entry = NULL;
165 lu_device_type_fini(type->typ_lu);
167 #ifdef CONFIG_PROC_FS
168 if (type->typ_name && type->typ_procroot)
169 remove_proc_subtree(type->typ_name, proc_lustre_root);
171 OBD_FREE(type, sizeof(*type));
174 static struct kobj_type class_ktype = {
175 .sysfs_ops = &lustre_sysfs_ops,
176 .release = class_sysfs_release,
179 #ifdef HAVE_SERVER_SUPPORT
180 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
182 struct dentry *symlink;
183 struct obd_type *type;
186 type = class_search_type(name);
188 kobject_put(&type->typ_kobj);
189 return ERR_PTR(-EEXIST);
192 OBD_ALLOC(type, sizeof(*type));
194 return ERR_PTR(-ENOMEM);
196 type->typ_kobj.kset = lustre_kset;
197 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
198 &lustre_kset->kobj, "%s", name);
202 symlink = debugfs_create_dir(name, debugfs_lustre_root);
203 type->typ_debugfs_entry = symlink;
204 type->typ_sym_filter = true;
207 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
209 if (IS_ERR(type->typ_procroot)) {
210 CERROR("%s: can't create compat proc entry: %d\n",
211 name, (int)PTR_ERR(type->typ_procroot));
212 type->typ_procroot = NULL;
218 EXPORT_SYMBOL(class_add_symlinks);
219 #endif /* HAVE_SERVER_SUPPORT */
221 #define CLASS_MAX_NAME 1024
223 int class_register_type(const struct obd_ops *dt_ops,
224 const struct md_ops *md_ops,
226 const char *name, struct lu_device_type *ldt)
228 struct obd_type *type;
233 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
235 type = class_search_type(name);
237 #ifdef HAVE_SERVER_SUPPORT
238 if (type->typ_sym_filter)
240 #endif /* HAVE_SERVER_SUPPORT */
241 kobject_put(&type->typ_kobj);
242 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
246 OBD_ALLOC(type, sizeof(*type));
250 type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
251 type->typ_kobj.kset = lustre_kset;
252 kobject_init(&type->typ_kobj, &class_ktype);
253 #ifdef HAVE_SERVER_SUPPORT
255 #endif /* HAVE_SERVER_SUPPORT */
257 type->typ_dt_ops = dt_ops;
258 type->typ_md_ops = md_ops;
260 #ifdef HAVE_SERVER_SUPPORT
261 if (type->typ_sym_filter) {
262 type->typ_sym_filter = false;
263 kobject_put(&type->typ_kobj);
267 #ifdef CONFIG_PROC_FS
268 if (enable_proc && !type->typ_procroot) {
269 type->typ_procroot = lprocfs_register(name,
272 if (IS_ERR(type->typ_procroot)) {
273 rc = PTR_ERR(type->typ_procroot);
274 type->typ_procroot = NULL;
279 type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
281 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
284 #ifdef HAVE_SERVER_SUPPORT
288 rc = lu_device_type_init(ldt);
289 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
290 wake_up_var(&type->typ_lu);
298 kobject_put(&type->typ_kobj);
302 EXPORT_SYMBOL(class_register_type);
304 int class_unregister_type(const char *name)
306 struct obd_type *type = class_search_type(name);
311 CERROR("unknown obd type\n");
315 if (atomic_read(&type->typ_refcnt)) {
316 CERROR("type %s has refcount (%d)\n", name,
317 atomic_read(&type->typ_refcnt));
318 /* This is a bad situation, let's make the best of it */
319 /* Remove ops, but leave the name for debugging */
320 type->typ_dt_ops = NULL;
321 type->typ_md_ops = NULL;
322 GOTO(out_put, rc = -EBUSY);
325 /* Put the final ref */
326 kobject_put(&type->typ_kobj);
328 /* Put the ref returned by class_search_type() */
329 kobject_put(&type->typ_kobj);
332 } /* class_unregister_type */
333 EXPORT_SYMBOL(class_unregister_type);
336 * Create a new obd device.
338 * Allocate the new obd_device and initialize it.
340 * \param[in] type_name obd device type string.
341 * \param[in] name obd device name.
342 * \param[in] uuid obd device UUID
344 * \retval newdev pointer to created obd_device
345 * \retval ERR_PTR(errno) on error
347 struct obd_device *class_newdev(const char *type_name, const char *name,
350 struct obd_device *newdev;
351 struct obd_type *type = NULL;
354 if (strlen(name) >= MAX_OBD_NAME) {
355 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
356 RETURN(ERR_PTR(-EINVAL));
359 type = class_get_type(type_name);
361 CERROR("OBD: unknown type: %s\n", type_name);
362 RETURN(ERR_PTR(-ENODEV));
365 newdev = obd_device_alloc();
366 if (newdev == NULL) {
367 class_put_type(type);
368 RETURN(ERR_PTR(-ENOMEM));
370 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
371 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
372 newdev->obd_type = type;
373 newdev->obd_minor = -1;
375 rwlock_init(&newdev->obd_pool_lock);
376 newdev->obd_pool_limit = 0;
377 newdev->obd_pool_slv = 0;
379 INIT_LIST_HEAD(&newdev->obd_exports);
380 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
381 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
382 INIT_LIST_HEAD(&newdev->obd_exports_timed);
383 INIT_LIST_HEAD(&newdev->obd_nid_stats);
384 spin_lock_init(&newdev->obd_nid_lock);
385 spin_lock_init(&newdev->obd_dev_lock);
386 mutex_init(&newdev->obd_dev_mutex);
387 spin_lock_init(&newdev->obd_osfs_lock);
388 /* newdev->obd_osfs_age must be set to a value in the distant
389 * past to guarantee a fresh statfs is fetched on mount. */
390 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
392 /* XXX belongs in setup not attach */
393 init_rwsem(&newdev->obd_observer_link_sem);
395 spin_lock_init(&newdev->obd_recovery_task_lock);
396 init_waitqueue_head(&newdev->obd_next_transno_waitq);
397 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
398 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
399 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
400 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
401 INIT_LIST_HEAD(&newdev->obd_evict_list);
402 INIT_LIST_HEAD(&newdev->obd_lwp_list);
404 llog_group_init(&newdev->obd_olg);
405 /* Detach drops this */
406 atomic_set(&newdev->obd_refcount, 1);
407 lu_ref_init(&newdev->obd_reference);
408 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
410 newdev->obd_conn_inprogress = 0;
412 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
414 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
415 newdev->obd_name, newdev);
423 * \param[in] obd obd_device to be freed
427 void class_free_dev(struct obd_device *obd)
429 struct obd_type *obd_type = obd->obd_type;
431 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
432 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
433 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
434 "obd %p != obd_devs[%d] %p\n",
435 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
436 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
437 "obd_refcount should be 0, not %d\n",
438 atomic_read(&obd->obd_refcount));
439 LASSERT(obd_type != NULL);
441 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
442 obd->obd_name, obd->obd_type->typ_name);
444 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
445 obd->obd_name, obd->obd_uuid.uuid);
446 if (obd->obd_stopping) {
449 /* If we're not stopping, we were never set up */
450 err = obd_cleanup(obd);
452 CERROR("Cleanup %s returned %d\n",
456 obd_device_free(obd);
458 class_put_type(obd_type);
462 * Unregister obd device.
464 * Free slot in obd_dev[] used by \a obd.
466 * \param[in] new_obd obd_device to be unregistered
470 void class_unregister_device(struct obd_device *obd)
472 write_lock(&obd_dev_lock);
473 if (obd->obd_minor >= 0) {
474 LASSERT(obd_devs[obd->obd_minor] == obd);
475 obd_devs[obd->obd_minor] = NULL;
478 write_unlock(&obd_dev_lock);
482 * Register obd device.
484 * Find free slot in obd_devs[], fills it with \a new_obd.
486 * \param[in] new_obd obd_device to be registered
489 * \retval -EEXIST device with this name is registered
490 * \retval -EOVERFLOW obd_devs[] is full
492 int class_register_device(struct obd_device *new_obd)
496 int new_obd_minor = 0;
497 bool minor_assign = false;
498 bool retried = false;
501 write_lock(&obd_dev_lock);
502 for (i = 0; i < class_devno_max(); i++) {
503 struct obd_device *obd = class_num2obd(i);
506 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
509 write_unlock(&obd_dev_lock);
511 /* the obd_device could be waited to be
512 * destroyed by the "obd_zombie_impexp_thread".
514 obd_zombie_barrier();
519 CERROR("%s: already exists, won't add\n",
521 /* in case we found a free slot before duplicate */
522 minor_assign = false;
526 if (!minor_assign && obd == NULL) {
533 new_obd->obd_minor = new_obd_minor;
534 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
535 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
536 obd_devs[new_obd_minor] = new_obd;
540 CERROR("%s: all %u/%u devices used, increase "
541 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
542 i, class_devno_max(), ret);
545 write_unlock(&obd_dev_lock);
550 static int class_name2dev_nolock(const char *name)
557 for (i = 0; i < class_devno_max(); i++) {
558 struct obd_device *obd = class_num2obd(i);
560 if (obd && strcmp(name, obd->obd_name) == 0) {
561 /* Make sure we finished attaching before we give
562 out any references */
563 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
564 if (obd->obd_attached) {
574 int class_name2dev(const char *name)
581 read_lock(&obd_dev_lock);
582 i = class_name2dev_nolock(name);
583 read_unlock(&obd_dev_lock);
587 EXPORT_SYMBOL(class_name2dev);
589 struct obd_device *class_name2obd(const char *name)
591 int dev = class_name2dev(name);
593 if (dev < 0 || dev > class_devno_max())
595 return class_num2obd(dev);
597 EXPORT_SYMBOL(class_name2obd);
599 int class_uuid2dev_nolock(struct obd_uuid *uuid)
603 for (i = 0; i < class_devno_max(); i++) {
604 struct obd_device *obd = class_num2obd(i);
606 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
607 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
615 int class_uuid2dev(struct obd_uuid *uuid)
619 read_lock(&obd_dev_lock);
620 i = class_uuid2dev_nolock(uuid);
621 read_unlock(&obd_dev_lock);
625 EXPORT_SYMBOL(class_uuid2dev);
627 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
629 int dev = class_uuid2dev(uuid);
632 return class_num2obd(dev);
634 EXPORT_SYMBOL(class_uuid2obd);
637 * Get obd device from ::obd_devs[]
639 * \param num [in] array index
641 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
642 * otherwise return the obd device there.
644 struct obd_device *class_num2obd(int num)
646 struct obd_device *obd = NULL;
648 if (num < class_devno_max()) {
653 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
654 "%p obd_magic %08x != %08x\n",
655 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
656 LASSERTF(obd->obd_minor == num,
657 "%p obd_minor %0d != %0d\n",
658 obd, obd->obd_minor, num);
663 EXPORT_SYMBOL(class_num2obd);
666 * Find obd in obd_dev[] by name or uuid.
668 * Increment obd's refcount if found.
670 * \param[in] str obd name or uuid
672 * \retval NULL if not found
673 * \retval target pointer to found obd_device
675 struct obd_device *class_dev_by_str(const char *str)
677 struct obd_device *target = NULL;
678 struct obd_uuid tgtuuid;
681 obd_str2uuid(&tgtuuid, str);
683 read_lock(&obd_dev_lock);
684 rc = class_uuid2dev_nolock(&tgtuuid);
686 rc = class_name2dev_nolock(str);
689 target = class_num2obd(rc);
692 class_incref(target, "find", current);
693 read_unlock(&obd_dev_lock);
697 EXPORT_SYMBOL(class_dev_by_str);
700 * Get obd devices count. Device in any
702 * \retval obd device count
704 int get_devices_count(void)
706 int index, max_index = class_devno_max(), dev_count = 0;
708 read_lock(&obd_dev_lock);
709 for (index = 0; index <= max_index; index++) {
710 struct obd_device *obd = class_num2obd(index);
714 read_unlock(&obd_dev_lock);
718 EXPORT_SYMBOL(get_devices_count);
720 void class_obd_list(void)
725 read_lock(&obd_dev_lock);
726 for (i = 0; i < class_devno_max(); i++) {
727 struct obd_device *obd = class_num2obd(i);
731 if (obd->obd_stopping)
733 else if (obd->obd_set_up)
735 else if (obd->obd_attached)
739 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
740 i, status, obd->obd_type->typ_name,
741 obd->obd_name, obd->obd_uuid.uuid,
742 atomic_read(&obd->obd_refcount));
744 read_unlock(&obd_dev_lock);
747 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
748 * specified, then only the client with that uuid is returned,
749 * otherwise any client connected to the tgt is returned.
751 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
752 const char *type_name,
753 struct obd_uuid *grp_uuid)
757 read_lock(&obd_dev_lock);
758 for (i = 0; i < class_devno_max(); i++) {
759 struct obd_device *obd = class_num2obd(i);
763 if ((strncmp(obd->obd_type->typ_name, type_name,
764 strlen(type_name)) == 0)) {
765 if (obd_uuid_equals(tgt_uuid,
766 &obd->u.cli.cl_target_uuid) &&
767 ((grp_uuid)? obd_uuid_equals(grp_uuid,
768 &obd->obd_uuid) : 1)) {
769 read_unlock(&obd_dev_lock);
774 read_unlock(&obd_dev_lock);
778 EXPORT_SYMBOL(class_find_client_obd);
780 /* Iterate the obd_device list looking devices have grp_uuid. Start
781 * searching at *next, and if a device is found, the next index to look
782 * at is saved in *next. If next is NULL, then the first matching device
783 * will always be returned.
785 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
791 else if (*next >= 0 && *next < class_devno_max())
796 read_lock(&obd_dev_lock);
797 for (; i < class_devno_max(); i++) {
798 struct obd_device *obd = class_num2obd(i);
802 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
805 read_unlock(&obd_dev_lock);
809 read_unlock(&obd_dev_lock);
813 EXPORT_SYMBOL(class_devices_in_group);
816 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
817 * adjust sptlrpc settings accordingly.
819 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
821 struct obd_device *obd;
825 LASSERT(namelen > 0);
827 read_lock(&obd_dev_lock);
828 for (i = 0; i < class_devno_max(); i++) {
829 obd = class_num2obd(i);
831 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
834 /* only notify mdc, osc, osp, lwp, mdt, ost
835 * because only these have a -sptlrpc llog */
836 type = obd->obd_type->typ_name;
837 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
838 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
839 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
840 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
841 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
842 strcmp(type, LUSTRE_OST_NAME) != 0)
845 if (strncmp(obd->obd_name, fsname, namelen))
848 class_incref(obd, __FUNCTION__, obd);
849 read_unlock(&obd_dev_lock);
850 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
851 sizeof(KEY_SPTLRPC_CONF),
852 KEY_SPTLRPC_CONF, 0, NULL, NULL);
854 class_decref(obd, __FUNCTION__, obd);
855 read_lock(&obd_dev_lock);
857 read_unlock(&obd_dev_lock);
860 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
862 void obd_cleanup_caches(void)
865 if (obd_device_cachep) {
866 kmem_cache_destroy(obd_device_cachep);
867 obd_device_cachep = NULL;
873 int obd_init_caches(void)
878 LASSERT(obd_device_cachep == NULL);
879 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
880 sizeof(struct obd_device),
881 0, 0, 0, sizeof(struct obd_device), NULL);
882 if (!obd_device_cachep)
883 GOTO(out, rc = -ENOMEM);
887 obd_cleanup_caches();
891 static const char export_handle_owner[] = "export";
893 /* map connection to client */
894 struct obd_export *class_conn2export(struct lustre_handle *conn)
896 struct obd_export *export;
900 CDEBUG(D_CACHE, "looking for null handle\n");
904 if (conn->cookie == -1) { /* this means assign a new connection */
905 CDEBUG(D_CACHE, "want a new connection\n");
909 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
910 export = class_handle2object(conn->cookie, export_handle_owner);
913 EXPORT_SYMBOL(class_conn2export);
915 struct obd_device *class_exp2obd(struct obd_export *exp)
921 EXPORT_SYMBOL(class_exp2obd);
923 struct obd_import *class_exp2cliimp(struct obd_export *exp)
925 struct obd_device *obd = exp->exp_obd;
928 return obd->u.cli.cl_import;
930 EXPORT_SYMBOL(class_exp2cliimp);
932 /* Export management functions */
933 static void class_export_destroy(struct obd_export *exp)
935 struct obd_device *obd = exp->exp_obd;
938 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
939 LASSERT(obd != NULL);
941 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
942 exp->exp_client_uuid.uuid, obd->obd_name);
944 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
945 ptlrpc_connection_put(exp->exp_connection);
947 LASSERT(list_empty(&exp->exp_outstanding_replies));
948 LASSERT(list_empty(&exp->exp_uncommitted_replies));
949 LASSERT(list_empty(&exp->exp_req_replay_queue));
950 LASSERT(list_empty(&exp->exp_hp_rpcs));
951 obd_destroy_export(exp);
952 /* self export doesn't hold a reference to an obd, although it
953 * exists until freeing of the obd */
954 if (exp != obd->obd_self_export)
955 class_decref(obd, "export", exp);
957 OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
958 kfree_rcu(exp, exp_handle.h_rcu);
962 struct obd_export *class_export_get(struct obd_export *exp)
964 refcount_inc(&exp->exp_handle.h_ref);
965 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
966 refcount_read(&exp->exp_handle.h_ref));
969 EXPORT_SYMBOL(class_export_get);
971 void class_export_put(struct obd_export *exp)
973 LASSERT(exp != NULL);
974 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
975 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
976 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
977 refcount_read(&exp->exp_handle.h_ref) - 1);
979 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
980 struct obd_device *obd = exp->exp_obd;
982 CDEBUG(D_IOCTL, "final put %p/%s\n",
983 exp, exp->exp_client_uuid.uuid);
985 /* release nid stat refererence */
986 lprocfs_exp_cleanup(exp);
988 if (exp == obd->obd_self_export) {
989 /* self export should be destroyed without
990 * zombie thread as it doesn't hold a
991 * reference to obd and doesn't hold any
993 class_export_destroy(exp);
994 /* self export is destroyed, no class
995 * references exist and it is safe to free
999 LASSERT(!list_empty(&exp->exp_obd_chain));
1000 obd_zombie_export_add(exp);
1005 EXPORT_SYMBOL(class_export_put);
1007 static void obd_zombie_exp_cull(struct work_struct *ws)
1009 struct obd_export *export;
1011 export = container_of(ws, struct obd_export, exp_zombie_work);
1012 class_export_destroy(export);
1015 /* Creates a new export, adds it to the hash table, and returns a
1016 * pointer to it. The refcount is 2: one for the hash reference, and
1017 * one for the pointer returned by this function. */
1018 struct obd_export *__class_new_export(struct obd_device *obd,
1019 struct obd_uuid *cluuid, bool is_self)
1021 struct obd_export *export;
1025 OBD_ALLOC_PTR(export);
1027 return ERR_PTR(-ENOMEM);
1029 export->exp_conn_cnt = 0;
1030 export->exp_lock_hash = NULL;
1031 export->exp_flock_hash = NULL;
1032 /* 2 = class_handle_hash + last */
1033 refcount_set(&export->exp_handle.h_ref, 2);
1034 atomic_set(&export->exp_rpc_count, 0);
1035 atomic_set(&export->exp_cb_count, 0);
1036 atomic_set(&export->exp_locks_count, 0);
1037 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1038 INIT_LIST_HEAD(&export->exp_locks_list);
1039 spin_lock_init(&export->exp_locks_list_guard);
1041 atomic_set(&export->exp_replay_count, 0);
1042 export->exp_obd = obd;
1043 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1044 spin_lock_init(&export->exp_uncommitted_replies_lock);
1045 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1046 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1047 INIT_HLIST_NODE(&export->exp_handle.h_link);
1048 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1049 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1050 class_handle_hash(&export->exp_handle, export_handle_owner);
1051 export->exp_last_request_time = ktime_get_real_seconds();
1052 spin_lock_init(&export->exp_lock);
1053 spin_lock_init(&export->exp_rpc_lock);
1054 INIT_HLIST_NODE(&export->exp_gen_hash);
1055 spin_lock_init(&export->exp_bl_list_lock);
1056 INIT_LIST_HEAD(&export->exp_bl_list);
1057 INIT_LIST_HEAD(&export->exp_stale_list);
1058 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1060 export->exp_sp_peer = LUSTRE_SP_ANY;
1061 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1062 export->exp_client_uuid = *cluuid;
1063 obd_init_export(export);
1065 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1067 spin_lock(&obd->obd_dev_lock);
1068 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1069 /* shouldn't happen, but might race */
1070 if (obd->obd_stopping)
1071 GOTO(exit_unlock, rc = -ENODEV);
1073 rc = obd_uuid_add(obd, export);
1075 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1076 obd->obd_name, cluuid->uuid, rc);
1077 GOTO(exit_unlock, rc = -EALREADY);
1082 class_incref(obd, "export", export);
1083 list_add_tail(&export->exp_obd_chain_timed,
1084 &obd->obd_exports_timed);
1085 list_add(&export->exp_obd_chain, &obd->obd_exports);
1086 obd->obd_num_exports++;
1088 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1089 INIT_LIST_HEAD(&export->exp_obd_chain);
1091 spin_unlock(&obd->obd_dev_lock);
1095 spin_unlock(&obd->obd_dev_lock);
1096 class_handle_unhash(&export->exp_handle);
1097 obd_destroy_export(export);
1098 OBD_FREE_PTR(export);
1102 struct obd_export *class_new_export(struct obd_device *obd,
1103 struct obd_uuid *uuid)
1105 return __class_new_export(obd, uuid, false);
1107 EXPORT_SYMBOL(class_new_export);
1109 struct obd_export *class_new_export_self(struct obd_device *obd,
1110 struct obd_uuid *uuid)
1112 return __class_new_export(obd, uuid, true);
1115 void class_unlink_export(struct obd_export *exp)
1117 class_handle_unhash(&exp->exp_handle);
1119 if (exp->exp_obd->obd_self_export == exp) {
1120 class_export_put(exp);
1124 spin_lock(&exp->exp_obd->obd_dev_lock);
1125 /* delete an uuid-export hashitem from hashtables */
1126 if (exp != exp->exp_obd->obd_self_export)
1127 obd_uuid_del(exp->exp_obd, exp);
1129 #ifdef HAVE_SERVER_SUPPORT
1130 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1131 struct tg_export_data *ted = &exp->exp_target_data;
1132 struct cfs_hash *hash;
1134 /* Because obd_gen_hash will not be released until
1135 * class_cleanup(), so hash should never be NULL here */
1136 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1137 LASSERT(hash != NULL);
1138 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1139 &exp->exp_gen_hash);
1140 cfs_hash_putref(hash);
1142 #endif /* HAVE_SERVER_SUPPORT */
1144 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1145 list_del_init(&exp->exp_obd_chain_timed);
1146 exp->exp_obd->obd_num_exports--;
1147 spin_unlock(&exp->exp_obd->obd_dev_lock);
1148 atomic_inc(&obd_stale_export_num);
1150 /* A reference is kept by obd_stale_exports list */
1151 obd_stale_export_put(exp);
1153 EXPORT_SYMBOL(class_unlink_export);
1155 /* Import management functions */
1156 static void obd_zombie_import_free(struct obd_import *imp)
1160 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1161 imp->imp_obd->obd_name);
1163 LASSERT(refcount_read(&imp->imp_refcount) == 0);
1165 ptlrpc_connection_put(imp->imp_connection);
1167 while (!list_empty(&imp->imp_conn_list)) {
1168 struct obd_import_conn *imp_conn;
1170 imp_conn = list_first_entry(&imp->imp_conn_list,
1171 struct obd_import_conn, oic_item);
1172 list_del_init(&imp_conn->oic_item);
1173 ptlrpc_connection_put(imp_conn->oic_conn);
1174 OBD_FREE(imp_conn, sizeof(*imp_conn));
1177 LASSERT(imp->imp_sec == NULL);
1178 LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1179 imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1180 class_decref(imp->imp_obd, "import", imp);
1185 struct obd_import *class_import_get(struct obd_import *import)
1187 refcount_inc(&import->imp_refcount);
1188 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1189 refcount_read(&import->imp_refcount),
1190 import->imp_obd->obd_name);
1193 EXPORT_SYMBOL(class_import_get);
1195 void class_import_put(struct obd_import *imp)
1199 LASSERT(refcount_read(&imp->imp_refcount) > 0);
1201 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1202 refcount_read(&imp->imp_refcount) - 1,
1203 imp->imp_obd->obd_name);
1205 if (refcount_dec_and_test(&imp->imp_refcount)) {
1206 CDEBUG(D_INFO, "final put import %p\n", imp);
1207 obd_zombie_import_add(imp);
1212 EXPORT_SYMBOL(class_import_put);
1214 static void init_imp_at(struct imp_at *at) {
1216 at_init(&at->iat_net_latency, 0, 0);
1217 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1218 /* max service estimates are tracked on the server side, so
1219 don't use the AT history here, just use the last reported
1220 val. (But keep hist for proc histogram, worst_ever) */
1221 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1226 static void obd_zombie_imp_cull(struct work_struct *ws)
1228 struct obd_import *import;
1230 import = container_of(ws, struct obd_import, imp_zombie_work);
1231 obd_zombie_import_free(import);
1234 struct obd_import *class_new_import(struct obd_device *obd)
1236 struct obd_import *imp;
1237 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1239 OBD_ALLOC(imp, sizeof(*imp));
1243 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1244 INIT_LIST_HEAD(&imp->imp_replay_list);
1245 INIT_LIST_HEAD(&imp->imp_sending_list);
1246 INIT_LIST_HEAD(&imp->imp_delayed_list);
1247 INIT_LIST_HEAD(&imp->imp_committed_list);
1248 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1249 imp->imp_known_replied_xid = 0;
1250 imp->imp_replay_cursor = &imp->imp_committed_list;
1251 spin_lock_init(&imp->imp_lock);
1252 imp->imp_last_success_conn = 0;
1253 imp->imp_state = LUSTRE_IMP_NEW;
1254 imp->imp_obd = class_incref(obd, "import", imp);
1255 rwlock_init(&imp->imp_sec_lock);
1256 init_waitqueue_head(&imp->imp_recovery_waitq);
1257 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1259 if (curr_pid_ns && curr_pid_ns->child_reaper)
1260 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1262 imp->imp_sec_refpid = 1;
1264 refcount_set(&imp->imp_refcount, 2);
1265 atomic_set(&imp->imp_unregistering, 0);
1266 atomic_set(&imp->imp_reqs, 0);
1267 atomic_set(&imp->imp_inflight, 0);
1268 atomic_set(&imp->imp_replay_inflight, 0);
1269 init_waitqueue_head(&imp->imp_replay_waitq);
1270 atomic_set(&imp->imp_inval_count, 0);
1271 INIT_LIST_HEAD(&imp->imp_conn_list);
1272 init_imp_at(&imp->imp_at);
1274 /* the default magic is V2, will be used in connect RPC, and
1275 * then adjusted according to the flags in request/reply. */
1276 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1280 EXPORT_SYMBOL(class_new_import);
1282 void class_destroy_import(struct obd_import *import)
1284 LASSERT(import != NULL);
1285 LASSERT(import != LP_POISON);
1287 spin_lock(&import->imp_lock);
1288 import->imp_generation++;
1289 spin_unlock(&import->imp_lock);
1290 class_import_put(import);
1292 EXPORT_SYMBOL(class_destroy_import);
1294 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1296 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1298 spin_lock(&exp->exp_locks_list_guard);
1300 LASSERT(lock->l_exp_refs_nr >= 0);
1302 if (lock->l_exp_refs_target != NULL &&
1303 lock->l_exp_refs_target != exp) {
1304 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1305 exp, lock, lock->l_exp_refs_target);
1307 if ((lock->l_exp_refs_nr ++) == 0) {
1308 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1309 lock->l_exp_refs_target = exp;
1311 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1312 lock, exp, lock->l_exp_refs_nr);
1313 spin_unlock(&exp->exp_locks_list_guard);
1315 EXPORT_SYMBOL(__class_export_add_lock_ref);
1317 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1319 spin_lock(&exp->exp_locks_list_guard);
1320 LASSERT(lock->l_exp_refs_nr > 0);
1321 if (lock->l_exp_refs_target != exp) {
1322 LCONSOLE_WARN("lock %p, "
1323 "mismatching export pointers: %p, %p\n",
1324 lock, lock->l_exp_refs_target, exp);
1326 if (-- lock->l_exp_refs_nr == 0) {
1327 list_del_init(&lock->l_exp_refs_link);
1328 lock->l_exp_refs_target = NULL;
1330 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1331 lock, exp, lock->l_exp_refs_nr);
1332 spin_unlock(&exp->exp_locks_list_guard);
1334 EXPORT_SYMBOL(__class_export_del_lock_ref);
1337 /* A connection defines an export context in which preallocation can
1338 be managed. This releases the export pointer reference, and returns
1339 the export handle, so the export refcount is 1 when this function
1341 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1342 struct obd_uuid *cluuid)
1344 struct obd_export *export;
1345 LASSERT(conn != NULL);
1346 LASSERT(obd != NULL);
1347 LASSERT(cluuid != NULL);
1350 export = class_new_export(obd, cluuid);
1352 RETURN(PTR_ERR(export));
1354 conn->cookie = export->exp_handle.h_cookie;
1355 class_export_put(export);
1357 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1358 cluuid->uuid, conn->cookie);
1361 EXPORT_SYMBOL(class_connect);
1363 /* if export is involved in recovery then clean up related things */
1364 static void class_export_recovery_cleanup(struct obd_export *exp)
1366 struct obd_device *obd = exp->exp_obd;
1368 spin_lock(&obd->obd_recovery_task_lock);
1369 if (obd->obd_recovering) {
1370 if (exp->exp_in_recovery) {
1371 spin_lock(&exp->exp_lock);
1372 exp->exp_in_recovery = 0;
1373 spin_unlock(&exp->exp_lock);
1374 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1375 atomic_dec(&obd->obd_connected_clients);
1378 /* if called during recovery then should update
1379 * obd_stale_clients counter,
1380 * lightweight exports are not counted */
1381 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1382 exp->exp_obd->obd_stale_clients++;
1384 spin_unlock(&obd->obd_recovery_task_lock);
1386 spin_lock(&exp->exp_lock);
1387 /** Cleanup req replay fields */
1388 if (exp->exp_req_replay_needed) {
1389 exp->exp_req_replay_needed = 0;
1391 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1392 atomic_dec(&obd->obd_req_replay_clients);
1395 /** Cleanup lock replay data */
1396 if (exp->exp_lock_replay_needed) {
1397 exp->exp_lock_replay_needed = 0;
1399 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1400 atomic_dec(&obd->obd_lock_replay_clients);
1402 spin_unlock(&exp->exp_lock);
1405 /* This function removes 1-3 references from the export:
1406 * 1 - for export pointer passed
1407 * and if disconnect really need
1408 * 2 - removing from hash
1409 * 3 - in client_unlink_export
1410 * The export pointer passed to this function can destroyed */
1411 int class_disconnect(struct obd_export *export)
1413 int already_disconnected;
1416 if (export == NULL) {
1417 CWARN("attempting to free NULL export %p\n", export);
1421 spin_lock(&export->exp_lock);
1422 already_disconnected = export->exp_disconnected;
1423 export->exp_disconnected = 1;
1424 #ifdef HAVE_SERVER_SUPPORT
1425 /* We hold references of export for uuid hash
1426 * and nid_hash and export link at least. So
1427 * it is safe to call rh*table_remove_fast in
1430 obd_nid_del(export->exp_obd, export);
1431 #endif /* HAVE_SERVER_SUPPORT */
1432 spin_unlock(&export->exp_lock);
1434 /* class_cleanup(), abort_recovery(), and class_fail_export()
1435 * all end up in here, and if any of them race we shouldn't
1436 * call extra class_export_puts(). */
1437 if (already_disconnected)
1438 GOTO(no_disconn, already_disconnected);
1440 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1441 export->exp_handle.h_cookie);
1443 class_export_recovery_cleanup(export);
1444 class_unlink_export(export);
1446 class_export_put(export);
1449 EXPORT_SYMBOL(class_disconnect);
1451 /* Return non-zero for a fully connected export */
1452 int class_connected_export(struct obd_export *exp)
1457 spin_lock(&exp->exp_lock);
1458 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1459 spin_unlock(&exp->exp_lock);
1463 EXPORT_SYMBOL(class_connected_export);
1465 static void class_disconnect_export_list(struct list_head *list,
1466 enum obd_option flags)
1469 struct obd_export *exp;
1472 /* It's possible that an export may disconnect itself, but
1473 * nothing else will be added to this list. */
1474 while (!list_empty(list)) {
1475 exp = list_first_entry(list, struct obd_export,
1477 /* need for safe call CDEBUG after obd_disconnect */
1478 class_export_get(exp);
1480 spin_lock(&exp->exp_lock);
1481 exp->exp_flags = flags;
1482 spin_unlock(&exp->exp_lock);
1484 if (obd_uuid_equals(&exp->exp_client_uuid,
1485 &exp->exp_obd->obd_uuid)) {
1487 "exp %p export uuid == obd uuid, don't discon\n",
1489 /* Need to delete this now so we don't end up pointing
1490 * to work_list later when this export is cleaned up. */
1491 list_del_init(&exp->exp_obd_chain);
1492 class_export_put(exp);
1496 class_export_get(exp);
1497 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1498 "last request at %lld\n",
1499 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1500 exp, exp->exp_last_request_time);
1501 /* release one export reference anyway */
1502 rc = obd_disconnect(exp);
1504 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1505 obd_export_nid2str(exp), exp, rc);
1506 class_export_put(exp);
1511 void class_disconnect_exports(struct obd_device *obd)
1513 LIST_HEAD(work_list);
1516 /* Move all of the exports from obd_exports to a work list, en masse. */
1517 spin_lock(&obd->obd_dev_lock);
1518 list_splice_init(&obd->obd_exports, &work_list);
1519 list_splice_init(&obd->obd_delayed_exports, &work_list);
1520 spin_unlock(&obd->obd_dev_lock);
1522 if (!list_empty(&work_list)) {
1523 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1524 "disconnecting them\n", obd->obd_minor, obd);
1525 class_disconnect_export_list(&work_list,
1526 exp_flags_from_obd(obd));
1528 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1529 obd->obd_minor, obd);
1532 EXPORT_SYMBOL(class_disconnect_exports);
1534 /* Remove exports that have not completed recovery.
1536 void class_disconnect_stale_exports(struct obd_device *obd,
1537 int (*test_export)(struct obd_export *))
1539 LIST_HEAD(work_list);
1540 struct obd_export *exp, *n;
1544 spin_lock(&obd->obd_dev_lock);
1545 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1547 /* don't count self-export as client */
1548 if (obd_uuid_equals(&exp->exp_client_uuid,
1549 &exp->exp_obd->obd_uuid))
1552 /* don't evict clients which have no slot in last_rcvd
1553 * (e.g. lightweight connection) */
1554 if (exp->exp_target_data.ted_lr_idx == -1)
1557 spin_lock(&exp->exp_lock);
1558 if (exp->exp_failed || test_export(exp)) {
1559 spin_unlock(&exp->exp_lock);
1562 exp->exp_failed = 1;
1563 spin_unlock(&exp->exp_lock);
1565 list_move(&exp->exp_obd_chain, &work_list);
1567 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1568 obd->obd_name, exp->exp_client_uuid.uuid,
1569 obd_export_nid2str(exp));
1570 print_export_data(exp, "EVICTING", 0, D_HA);
1572 spin_unlock(&obd->obd_dev_lock);
1575 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1576 obd->obd_name, evicted);
1578 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1579 OBD_OPT_ABORT_RECOV);
1582 EXPORT_SYMBOL(class_disconnect_stale_exports);
1584 void class_fail_export(struct obd_export *exp)
1586 int rc, already_failed;
1588 spin_lock(&exp->exp_lock);
1589 already_failed = exp->exp_failed;
1590 exp->exp_failed = 1;
1591 spin_unlock(&exp->exp_lock);
1593 if (already_failed) {
1594 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1595 exp, exp->exp_client_uuid.uuid);
1599 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1600 exp, exp->exp_client_uuid.uuid);
1602 if (obd_dump_on_timeout)
1603 libcfs_debug_dumplog();
1605 /* need for safe call CDEBUG after obd_disconnect */
1606 class_export_get(exp);
1608 /* Most callers into obd_disconnect are removing their own reference
1609 * (request, for example) in addition to the one from the hash table.
1610 * We don't have such a reference here, so make one. */
1611 class_export_get(exp);
1612 rc = obd_disconnect(exp);
1614 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1616 CDEBUG(D_HA, "disconnected export %p/%s\n",
1617 exp, exp->exp_client_uuid.uuid);
1618 class_export_put(exp);
1620 EXPORT_SYMBOL(class_fail_export);
1622 #ifdef HAVE_SERVER_SUPPORT
1624 static int take_first(struct obd_export *exp, void *data)
1626 struct obd_export **expp = data;
1629 /* already have one */
1631 if (exp->exp_failed)
1632 /* Don't want this one */
1634 if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1635 /* Cannot get a ref on this one */
1641 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1643 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1644 struct obd_export *doomed_exp;
1645 int exports_evicted = 0;
1647 spin_lock(&obd->obd_dev_lock);
1648 /* umount has run already, so evict thread should leave
1649 * its task to umount thread now */
1650 if (obd->obd_stopping) {
1651 spin_unlock(&obd->obd_dev_lock);
1652 return exports_evicted;
1654 spin_unlock(&obd->obd_dev_lock);
1657 while (obd_nid_export_for_each(obd, nid_key,
1658 take_first, &doomed_exp) > 0) {
1660 LASSERTF(doomed_exp != obd->obd_self_export,
1661 "self-export is hashed by NID?\n");
1663 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1665 obd_uuid2str(&doomed_exp->exp_client_uuid),
1666 obd_export_nid2str(doomed_exp));
1668 class_fail_export(doomed_exp);
1669 class_export_put(doomed_exp);
1674 if (!exports_evicted)
1676 "%s: can't disconnect NID '%s': no exports found\n",
1677 obd->obd_name, nid);
1678 return exports_evicted;
1680 EXPORT_SYMBOL(obd_export_evict_by_nid);
1682 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1684 struct obd_export *doomed_exp = NULL;
1685 struct obd_uuid doomed_uuid;
1686 int exports_evicted = 0;
1688 spin_lock(&obd->obd_dev_lock);
1689 if (obd->obd_stopping) {
1690 spin_unlock(&obd->obd_dev_lock);
1691 return exports_evicted;
1693 spin_unlock(&obd->obd_dev_lock);
1695 obd_str2uuid(&doomed_uuid, uuid);
1696 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1697 CERROR("%s: can't evict myself\n", obd->obd_name);
1698 return exports_evicted;
1701 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1702 if (doomed_exp == NULL) {
1703 CERROR("%s: can't disconnect %s: no exports found\n",
1704 obd->obd_name, uuid);
1706 CWARN("%s: evicting %s at adminstrative request\n",
1707 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1708 class_fail_export(doomed_exp);
1709 class_export_put(doomed_exp);
1710 obd_uuid_del(obd, doomed_exp);
1714 return exports_evicted;
1716 #endif /* HAVE_SERVER_SUPPORT */
1718 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1719 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1720 EXPORT_SYMBOL(class_export_dump_hook);
1723 static void print_export_data(struct obd_export *exp, const char *status,
1724 int locks, int debug_level)
1726 struct ptlrpc_reply_state *rs;
1727 struct ptlrpc_reply_state *first_reply = NULL;
1730 spin_lock(&exp->exp_lock);
1731 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1737 spin_unlock(&exp->exp_lock);
1739 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1740 "%p %s %llu stale:%d\n",
1741 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1742 obd_export_nid2str(exp),
1743 refcount_read(&exp->exp_handle.h_ref),
1744 atomic_read(&exp->exp_rpc_count),
1745 atomic_read(&exp->exp_cb_count),
1746 atomic_read(&exp->exp_locks_count),
1747 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1748 nreplies, first_reply, nreplies > 3 ? "..." : "",
1749 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1750 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1751 if (locks && class_export_dump_hook != NULL)
1752 class_export_dump_hook(exp);
1756 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1758 struct obd_export *exp;
1760 spin_lock(&obd->obd_dev_lock);
1761 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1762 print_export_data(exp, "ACTIVE", locks, debug_level);
1763 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1764 print_export_data(exp, "UNLINKED", locks, debug_level);
1765 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1766 print_export_data(exp, "DELAYED", locks, debug_level);
1767 spin_unlock(&obd->obd_dev_lock);
1770 void obd_exports_barrier(struct obd_device *obd)
1773 LASSERT(list_empty(&obd->obd_exports));
1774 spin_lock(&obd->obd_dev_lock);
1775 while (!list_empty(&obd->obd_unlinked_exports)) {
1776 spin_unlock(&obd->obd_dev_lock);
1777 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1778 if (waited > 5 && is_power_of_2(waited)) {
1779 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1780 "more than %d seconds. "
1781 "The obd refcount = %d. Is it stuck?\n",
1782 obd->obd_name, waited,
1783 atomic_read(&obd->obd_refcount));
1784 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1787 spin_lock(&obd->obd_dev_lock);
1789 spin_unlock(&obd->obd_dev_lock);
1791 EXPORT_SYMBOL(obd_exports_barrier);
1794 * Add export to the obd_zombe thread and notify it.
1796 static void obd_zombie_export_add(struct obd_export *exp) {
1797 atomic_dec(&obd_stale_export_num);
1798 spin_lock(&exp->exp_obd->obd_dev_lock);
1799 LASSERT(!list_empty(&exp->exp_obd_chain));
1800 list_del_init(&exp->exp_obd_chain);
1801 spin_unlock(&exp->exp_obd->obd_dev_lock);
1803 queue_work(zombie_wq, &exp->exp_zombie_work);
1807 * Add import to the obd_zombe thread and notify it.
1809 static void obd_zombie_import_add(struct obd_import *imp) {
1810 LASSERT(imp->imp_sec == NULL);
1812 queue_work(zombie_wq, &imp->imp_zombie_work);
1816 * wait when obd_zombie import/export queues become empty
1818 void obd_zombie_barrier(void)
1820 flush_workqueue(zombie_wq);
1822 EXPORT_SYMBOL(obd_zombie_barrier);
1825 struct obd_export *obd_stale_export_get(void)
1827 struct obd_export *exp = NULL;
1830 spin_lock(&obd_stale_export_lock);
1831 if (!list_empty(&obd_stale_exports)) {
1832 exp = list_first_entry(&obd_stale_exports,
1833 struct obd_export, exp_stale_list);
1834 list_del_init(&exp->exp_stale_list);
1836 spin_unlock(&obd_stale_export_lock);
1839 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1840 atomic_read(&obd_stale_export_num));
1844 EXPORT_SYMBOL(obd_stale_export_get);
1846 void obd_stale_export_put(struct obd_export *exp)
1850 LASSERT(list_empty(&exp->exp_stale_list));
1851 if (exp->exp_lock_hash &&
1852 atomic_read(&exp->exp_lock_hash->hs_count)) {
1853 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1854 atomic_read(&obd_stale_export_num));
1856 spin_lock_bh(&exp->exp_bl_list_lock);
1857 spin_lock(&obd_stale_export_lock);
1858 /* Add to the tail if there is no blocked locks,
1859 * to the head otherwise. */
1860 if (list_empty(&exp->exp_bl_list))
1861 list_add_tail(&exp->exp_stale_list,
1862 &obd_stale_exports);
1864 list_add(&exp->exp_stale_list,
1865 &obd_stale_exports);
1867 spin_unlock(&obd_stale_export_lock);
1868 spin_unlock_bh(&exp->exp_bl_list_lock);
1870 class_export_put(exp);
1874 EXPORT_SYMBOL(obd_stale_export_put);
1877 * Adjust the position of the export in the stale list,
1878 * i.e. move to the head of the list if is needed.
1880 void obd_stale_export_adjust(struct obd_export *exp)
1882 LASSERT(exp != NULL);
1883 spin_lock_bh(&exp->exp_bl_list_lock);
1884 spin_lock(&obd_stale_export_lock);
1886 if (!list_empty(&exp->exp_stale_list) &&
1887 !list_empty(&exp->exp_bl_list))
1888 list_move(&exp->exp_stale_list, &obd_stale_exports);
1890 spin_unlock(&obd_stale_export_lock);
1891 spin_unlock_bh(&exp->exp_bl_list_lock);
1893 EXPORT_SYMBOL(obd_stale_export_adjust);
1896 * start destroy zombie import/export thread
1898 int obd_zombie_impexp_init(void)
1900 zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1902 cfs_cpt_number(cfs_cpt_tab));
1904 return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1908 * stop destroy zombie import/export thread
1910 void obd_zombie_impexp_stop(void)
1912 destroy_workqueue(zombie_wq);
1913 LASSERT(list_empty(&obd_stale_exports));
1916 /***** Kernel-userspace comm helpers *******/
1918 /* Get length of entire message, including header */
1919 int kuc_len(int payload_len)
1921 return sizeof(struct kuc_hdr) + payload_len;
1923 EXPORT_SYMBOL(kuc_len);
1925 /* Get a pointer to kuc header, given a ptr to the payload
1926 * @param p Pointer to payload area
1927 * @returns Pointer to kuc header
1929 struct kuc_hdr * kuc_ptr(void *p)
1931 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1932 LASSERT(lh->kuc_magic == KUC_MAGIC);
1935 EXPORT_SYMBOL(kuc_ptr);
1937 /* Alloc space for a message, and fill in header
1938 * @return Pointer to payload area
1940 void *kuc_alloc(int payload_len, int transport, int type)
1943 int len = kuc_len(payload_len);
1947 return ERR_PTR(-ENOMEM);
1949 lh->kuc_magic = KUC_MAGIC;
1950 lh->kuc_transport = transport;
1951 lh->kuc_msgtype = type;
1952 lh->kuc_msglen = len;
1954 return (void *)(lh + 1);
1956 EXPORT_SYMBOL(kuc_alloc);
1958 /* Takes pointer to payload area */
1959 void kuc_free(void *p, int payload_len)
1961 struct kuc_hdr *lh = kuc_ptr(p);
1962 OBD_FREE(lh, kuc_len(payload_len));
1964 EXPORT_SYMBOL(kuc_free);
1966 struct obd_request_slot_waiter {
1967 struct list_head orsw_entry;
1968 wait_queue_head_t orsw_waitq;
1972 static bool obd_request_slot_avail(struct client_obd *cli,
1973 struct obd_request_slot_waiter *orsw)
1977 spin_lock(&cli->cl_loi_list_lock);
1978 avail = !!list_empty(&orsw->orsw_entry);
1979 spin_unlock(&cli->cl_loi_list_lock);
1985 * For network flow control, the RPC sponsor needs to acquire a credit
1986 * before sending the RPC. The credits count for a connection is defined
1987 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1988 * the subsequent RPC sponsors need to wait until others released their
1989 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1991 int obd_get_request_slot(struct client_obd *cli)
1993 struct obd_request_slot_waiter orsw;
1996 spin_lock(&cli->cl_loi_list_lock);
1997 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
1998 cli->cl_rpcs_in_flight++;
1999 spin_unlock(&cli->cl_loi_list_lock);
2003 init_waitqueue_head(&orsw.orsw_waitq);
2004 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2005 orsw.orsw_signaled = false;
2006 spin_unlock(&cli->cl_loi_list_lock);
2008 rc = l_wait_event_abortable(orsw.orsw_waitq,
2009 obd_request_slot_avail(cli, &orsw) ||
2010 orsw.orsw_signaled);
2012 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2013 * freed but other (such as obd_put_request_slot) is using it. */
2014 spin_lock(&cli->cl_loi_list_lock);
2016 if (!orsw.orsw_signaled) {
2017 if (list_empty(&orsw.orsw_entry))
2018 cli->cl_rpcs_in_flight--;
2020 list_del(&orsw.orsw_entry);
2025 if (orsw.orsw_signaled) {
2026 LASSERT(list_empty(&orsw.orsw_entry));
2030 spin_unlock(&cli->cl_loi_list_lock);
2034 EXPORT_SYMBOL(obd_get_request_slot);
2036 void obd_put_request_slot(struct client_obd *cli)
2038 struct obd_request_slot_waiter *orsw;
2040 spin_lock(&cli->cl_loi_list_lock);
2041 cli->cl_rpcs_in_flight--;
2043 /* If there is free slot, wakeup the first waiter. */
2044 if (!list_empty(&cli->cl_flight_waiters) &&
2045 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2046 orsw = list_first_entry(&cli->cl_flight_waiters,
2047 struct obd_request_slot_waiter,
2049 list_del_init(&orsw->orsw_entry);
2050 cli->cl_rpcs_in_flight++;
2051 wake_up(&orsw->orsw_waitq);
2053 spin_unlock(&cli->cl_loi_list_lock);
2055 EXPORT_SYMBOL(obd_put_request_slot);
2057 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2059 return cli->cl_max_rpcs_in_flight;
2061 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2063 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2065 struct obd_request_slot_waiter *orsw;
2071 if (max > OBD_MAX_RIF_MAX || max < 1)
2074 CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2075 cli->cl_import->imp_obd->obd_name, max,
2076 cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2078 if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2079 LUSTRE_MDC_NAME) == 0) {
2080 /* adjust max_mod_rpcs_in_flight to ensure it is always
2081 * strictly lower that max_rpcs_in_flight */
2083 CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2084 cli->cl_import->imp_obd->obd_name);
2087 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2088 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2094 spin_lock(&cli->cl_loi_list_lock);
2095 old = cli->cl_max_rpcs_in_flight;
2096 cli->cl_max_rpcs_in_flight = max;
2097 client_adjust_max_dirty(cli);
2101 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2102 for (i = 0; i < diff; i++) {
2103 if (list_empty(&cli->cl_flight_waiters))
2106 orsw = list_first_entry(&cli->cl_flight_waiters,
2107 struct obd_request_slot_waiter,
2109 list_del_init(&orsw->orsw_entry);
2110 cli->cl_rpcs_in_flight++;
2111 wake_up(&orsw->orsw_waitq);
2113 spin_unlock(&cli->cl_loi_list_lock);
2117 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2119 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2121 return cli->cl_max_mod_rpcs_in_flight;
2123 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2125 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2127 struct obd_connect_data *ocd;
2131 if (max > OBD_MAX_RIF_MAX || max < 1)
2134 ocd = &cli->cl_import->imp_connect_data;
2135 CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2136 cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2137 ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2139 if (max == OBD_MAX_RIF_MAX)
2140 max = OBD_MAX_RIF_MAX - 1;
2142 /* Cannot exceed or equal max_rpcs_in_flight. If we are asked to
2143 * increase this value, also bump up max_rpcs_in_flight to match.
2145 if (max >= cli->cl_max_rpcs_in_flight) {
2147 "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2148 cli->cl_import->imp_obd->obd_name, max + 1, max);
2149 obd_set_max_rpcs_in_flight(cli, max + 1);
2152 /* cannot exceed max modify RPCs in flight supported by the server,
2153 * but verify ocd_connect_flags is at least initialized first. If
2154 * not, allow it and fix value later in ptlrpc_connect_set_flags().
2156 if (!ocd->ocd_connect_flags) {
2157 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2158 } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2159 maxmodrpcs = ocd->ocd_maxmodrpcs;
2160 if (maxmodrpcs == 0) { /* connection not finished yet */
2161 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2163 "%s: partial connect, assume maxmodrpcs=%hu\n",
2164 cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2169 if (max > maxmodrpcs) {
2170 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than ocd_maxmodrpcs=%hu returned by the server at connection\n",
2171 cli->cl_import->imp_obd->obd_name,
2176 spin_lock(&cli->cl_mod_rpcs_lock);
2178 prev = cli->cl_max_mod_rpcs_in_flight;
2179 cli->cl_max_mod_rpcs_in_flight = max;
2181 /* wakeup waiters if limit has been increased */
2182 if (cli->cl_max_mod_rpcs_in_flight > prev)
2183 wake_up(&cli->cl_mod_rpcs_waitq);
2185 spin_unlock(&cli->cl_mod_rpcs_lock);
2189 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2191 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2192 struct seq_file *seq)
2194 unsigned long mod_tot = 0, mod_cum;
2195 struct timespec64 now;
2198 ktime_get_real_ts64(&now);
2200 spin_lock(&cli->cl_mod_rpcs_lock);
2202 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2203 (s64)now.tv_sec, now.tv_nsec);
2204 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2205 cli->cl_mod_rpcs_in_flight);
2207 seq_printf(seq, "\n\t\t\tmodify\n");
2208 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2210 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2213 for (i = 0; i < OBD_HIST_MAX; i++) {
2214 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2216 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2217 i, mod, pct(mod, mod_tot),
2218 pct(mod_cum, mod_tot));
2219 if (mod_cum == mod_tot)
2223 spin_unlock(&cli->cl_mod_rpcs_lock);
2227 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2229 /* The number of modify RPCs sent in parallel is limited
2230 * because the server has a finite number of slots per client to
2231 * store request result and ensure reply reconstruction when needed.
2232 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2233 * that takes into account server limit and cl_max_rpcs_in_flight
2235 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2236 * one close request is allowed above the maximum.
2238 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2243 /* A slot is available if
2244 * - number of modify RPCs in flight is less than the max
2245 * - it's a close RPC and no other close request is in flight
2247 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2248 (close_req && cli->cl_close_rpcs_in_flight == 0);
2253 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2258 spin_lock(&cli->cl_mod_rpcs_lock);
2259 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2260 spin_unlock(&cli->cl_mod_rpcs_lock);
2265 /* Get a modify RPC slot from the obd client @cli according
2266 * to the kind of operation @opc that is going to be sent
2267 * and the intent @it of the operation if it applies.
2268 * If the maximum number of modify RPCs in flight is reached
2269 * the thread is put to sleep.
2270 * Returns the tag to be set in the request message. Tag 0
2271 * is reserved for non-modifying requests.
2273 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2275 bool close_req = false;
2278 if (opc == MDS_CLOSE)
2282 spin_lock(&cli->cl_mod_rpcs_lock);
2283 max = cli->cl_max_mod_rpcs_in_flight;
2284 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2285 /* there is a slot available */
2286 cli->cl_mod_rpcs_in_flight++;
2288 cli->cl_close_rpcs_in_flight++;
2289 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2290 cli->cl_mod_rpcs_in_flight);
2291 /* find a free tag */
2292 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2294 LASSERT(i < OBD_MAX_RIF_MAX);
2295 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2296 spin_unlock(&cli->cl_mod_rpcs_lock);
2297 /* tag 0 is reserved for non-modify RPCs */
2300 "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2301 cli->cl_import->imp_obd->obd_name,
2306 spin_unlock(&cli->cl_mod_rpcs_lock);
2308 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2309 "opc %u, max %hu\n",
2310 cli->cl_import->imp_obd->obd_name, opc, max);
2312 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2313 obd_mod_rpc_slot_avail(cli,
2317 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2319 /* Put a modify RPC slot from the obd client @cli according
2320 * to the kind of operation @opc that has been sent.
2322 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2324 bool close_req = false;
2329 if (opc == MDS_CLOSE)
2332 spin_lock(&cli->cl_mod_rpcs_lock);
2333 cli->cl_mod_rpcs_in_flight--;
2335 cli->cl_close_rpcs_in_flight--;
2336 /* release the tag in the bitmap */
2337 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2338 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2339 spin_unlock(&cli->cl_mod_rpcs_lock);
2340 wake_up(&cli->cl_mod_rpcs_waitq);
2342 EXPORT_SYMBOL(obd_put_mod_rpc_slot);