4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52 static struct kmem_cache *obd_device_cachep;
53 static struct kobj_type class_ktype;
54 static struct workqueue_struct *zombie_wq;
56 static void obd_zombie_export_add(struct obd_export *exp);
57 static void obd_zombie_import_add(struct obd_import *imp);
58 static void print_export_data(struct obd_export *exp,
59 const char *status, int locks, int debug_level);
61 static LIST_HEAD(obd_stale_exports);
62 static DEFINE_SPINLOCK(obd_stale_export_lock);
63 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
66 * support functions: we could use inter-module communication, but this
67 * is more portable to other OS's
69 static struct obd_device *obd_device_alloc(void)
71 struct obd_device *obd;
73 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
75 obd->obd_magic = OBD_DEVICE_MAGIC;
80 static void obd_device_free(struct obd_device *obd)
83 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
84 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
85 if (obd->obd_namespace != NULL) {
86 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
87 obd, obd->obd_namespace, obd->obd_force);
90 lu_ref_fini(&obd->obd_reference);
91 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 struct obd_type *class_search_type(const char *name)
96 struct kobject *kobj = kset_find_obj(lustre_kset, name);
98 if (kobj && kobj->ktype == &class_ktype)
99 return container_of(kobj, struct obd_type, typ_kobj);
104 EXPORT_SYMBOL(class_search_type);
106 struct obd_type *class_get_type(const char *name)
108 struct obd_type *type;
110 type = class_search_type(name);
111 #ifdef HAVE_MODULE_LOADING_SUPPORT
113 const char *modname = name;
115 #ifdef HAVE_SERVER_SUPPORT
116 if (strcmp(modname, "obdfilter") == 0)
119 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
120 modname = LUSTRE_OSP_NAME;
122 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
123 modname = LUSTRE_MDT_NAME;
124 #endif /* HAVE_SERVER_SUPPORT */
126 if (!request_module("%s", modname)) {
127 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
128 type = class_search_type(name);
130 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
136 if (try_module_get(type->typ_dt_ops->o_owner)) {
137 atomic_inc(&type->typ_refcnt);
138 /* class_search_type() returned a counted reference,
139 * but we don't need that count any more as
140 * we have one through typ_refcnt.
142 kobject_put(&type->typ_kobj);
144 kobject_put(&type->typ_kobj);
151 void class_put_type(struct obd_type *type)
154 module_put(type->typ_dt_ops->o_owner);
155 atomic_dec(&type->typ_refcnt);
158 static void class_sysfs_release(struct kobject *kobj)
160 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
162 debugfs_remove_recursive(type->typ_debugfs_entry);
163 type->typ_debugfs_entry = NULL;
166 lu_device_type_fini(type->typ_lu);
168 #ifdef CONFIG_PROC_FS
169 if (type->typ_name && type->typ_procroot)
170 remove_proc_subtree(type->typ_name, proc_lustre_root);
172 OBD_FREE(type, sizeof(*type));
175 static struct kobj_type class_ktype = {
176 .sysfs_ops = &lustre_sysfs_ops,
177 .release = class_sysfs_release,
180 #ifdef HAVE_SERVER_SUPPORT
181 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
183 struct dentry *symlink;
184 struct obd_type *type;
187 type = class_search_type(name);
189 kobject_put(&type->typ_kobj);
190 return ERR_PTR(-EEXIST);
193 OBD_ALLOC(type, sizeof(*type));
195 return ERR_PTR(-ENOMEM);
197 type->typ_kobj.kset = lustre_kset;
198 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
199 &lustre_kset->kobj, "%s", name);
203 symlink = debugfs_create_dir(name, debugfs_lustre_root);
204 type->typ_debugfs_entry = symlink;
205 type->typ_sym_filter = true;
208 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
210 if (IS_ERR(type->typ_procroot)) {
211 CERROR("%s: can't create compat proc entry: %d\n",
212 name, (int)PTR_ERR(type->typ_procroot));
213 type->typ_procroot = NULL;
219 EXPORT_SYMBOL(class_add_symlinks);
220 #endif /* HAVE_SERVER_SUPPORT */
222 #define CLASS_MAX_NAME 1024
224 int class_register_type(const struct obd_ops *dt_ops,
225 const struct md_ops *md_ops,
226 bool enable_proc, struct ldebugfs_vars *vars,
227 const char *name, struct lu_device_type *ldt)
229 struct obd_type *type;
234 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
236 type = class_search_type(name);
238 #ifdef HAVE_SERVER_SUPPORT
239 if (type->typ_sym_filter)
241 #endif /* HAVE_SERVER_SUPPORT */
242 kobject_put(&type->typ_kobj);
243 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
247 OBD_ALLOC(type, sizeof(*type));
251 type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
252 type->typ_kobj.kset = lustre_kset;
253 kobject_init(&type->typ_kobj, &class_ktype);
254 #ifdef HAVE_SERVER_SUPPORT
256 #endif /* HAVE_SERVER_SUPPORT */
258 type->typ_dt_ops = dt_ops;
259 type->typ_md_ops = md_ops;
261 #ifdef HAVE_SERVER_SUPPORT
262 if (type->typ_sym_filter) {
263 type->typ_sym_filter = false;
264 kobject_put(&type->typ_kobj);
268 #ifdef CONFIG_PROC_FS
269 if (enable_proc && !type->typ_procroot) {
270 type->typ_procroot = lprocfs_register(name,
273 if (IS_ERR(type->typ_procroot)) {
274 rc = PTR_ERR(type->typ_procroot);
275 type->typ_procroot = NULL;
280 type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
281 ldebugfs_add_vars(type->typ_debugfs_entry, vars, type);
283 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
286 #ifdef HAVE_SERVER_SUPPORT
290 rc = lu_device_type_init(ldt);
291 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
292 wake_up_var(&type->typ_lu);
300 kobject_put(&type->typ_kobj);
304 EXPORT_SYMBOL(class_register_type);
306 int class_unregister_type(const char *name)
308 struct obd_type *type = class_search_type(name);
313 CERROR("unknown obd type\n");
317 if (atomic_read(&type->typ_refcnt)) {
318 CERROR("type %s has refcount (%d)\n", name,
319 atomic_read(&type->typ_refcnt));
320 /* This is a bad situation, let's make the best of it */
321 /* Remove ops, but leave the name for debugging */
322 type->typ_dt_ops = NULL;
323 type->typ_md_ops = NULL;
324 GOTO(out_put, rc = -EBUSY);
327 /* Put the final ref */
328 kobject_put(&type->typ_kobj);
330 /* Put the ref returned by class_search_type() */
331 kobject_put(&type->typ_kobj);
334 } /* class_unregister_type */
335 EXPORT_SYMBOL(class_unregister_type);
338 * Create a new obd device.
340 * Allocate the new obd_device and initialize it.
342 * \param[in] type_name obd device type string.
343 * \param[in] name obd device name.
344 * \param[in] uuid obd device UUID
346 * \retval newdev pointer to created obd_device
347 * \retval ERR_PTR(errno) on error
349 struct obd_device *class_newdev(const char *type_name, const char *name,
352 struct obd_device *newdev;
353 struct obd_type *type = NULL;
356 if (strlen(name) >= MAX_OBD_NAME) {
357 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
358 RETURN(ERR_PTR(-EINVAL));
361 type = class_get_type(type_name);
363 CERROR("OBD: unknown type: %s\n", type_name);
364 RETURN(ERR_PTR(-ENODEV));
367 newdev = obd_device_alloc();
368 if (newdev == NULL) {
369 class_put_type(type);
370 RETURN(ERR_PTR(-ENOMEM));
372 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
373 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
374 newdev->obd_type = type;
375 newdev->obd_minor = -1;
377 rwlock_init(&newdev->obd_pool_lock);
378 newdev->obd_pool_limit = 0;
379 newdev->obd_pool_slv = 0;
381 INIT_LIST_HEAD(&newdev->obd_exports);
382 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
383 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
384 INIT_LIST_HEAD(&newdev->obd_exports_timed);
385 INIT_LIST_HEAD(&newdev->obd_nid_stats);
386 spin_lock_init(&newdev->obd_nid_lock);
387 spin_lock_init(&newdev->obd_dev_lock);
388 mutex_init(&newdev->obd_dev_mutex);
389 spin_lock_init(&newdev->obd_osfs_lock);
390 /* newdev->obd_osfs_age must be set to a value in the distant
391 * past to guarantee a fresh statfs is fetched on mount. */
392 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
394 /* XXX belongs in setup not attach */
395 init_rwsem(&newdev->obd_observer_link_sem);
397 spin_lock_init(&newdev->obd_recovery_task_lock);
398 init_waitqueue_head(&newdev->obd_next_transno_waitq);
399 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
400 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
401 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
402 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
403 INIT_LIST_HEAD(&newdev->obd_evict_list);
404 INIT_LIST_HEAD(&newdev->obd_lwp_list);
406 llog_group_init(&newdev->obd_olg);
407 /* Detach drops this */
408 atomic_set(&newdev->obd_refcount, 1);
409 lu_ref_init(&newdev->obd_reference);
410 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
412 newdev->obd_conn_inprogress = 0;
414 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
416 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
417 newdev->obd_name, newdev);
425 * \param[in] obd obd_device to be freed
429 void class_free_dev(struct obd_device *obd)
431 struct obd_type *obd_type = obd->obd_type;
433 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
434 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
435 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
436 "obd %p != obd_devs[%d] %p\n",
437 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
438 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
439 "obd_refcount should be 0, not %d\n",
440 atomic_read(&obd->obd_refcount));
441 LASSERT(obd_type != NULL);
443 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
444 obd->obd_name, obd->obd_type->typ_name);
446 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
447 obd->obd_name, obd->obd_uuid.uuid);
448 if (obd->obd_stopping) {
451 /* If we're not stopping, we were never set up */
452 err = obd_cleanup(obd);
454 CERROR("Cleanup %s returned %d\n",
458 obd_device_free(obd);
460 class_put_type(obd_type);
464 * Unregister obd device.
466 * Free slot in obd_dev[] used by \a obd.
468 * \param[in] new_obd obd_device to be unregistered
472 void class_unregister_device(struct obd_device *obd)
474 write_lock(&obd_dev_lock);
475 if (obd->obd_minor >= 0) {
476 LASSERT(obd_devs[obd->obd_minor] == obd);
477 obd_devs[obd->obd_minor] = NULL;
480 write_unlock(&obd_dev_lock);
484 * Register obd device.
486 * Find free slot in obd_devs[], fills it with \a new_obd.
488 * \param[in] new_obd obd_device to be registered
491 * \retval -EEXIST device with this name is registered
492 * \retval -EOVERFLOW obd_devs[] is full
494 int class_register_device(struct obd_device *new_obd)
498 int new_obd_minor = 0;
499 bool minor_assign = false;
500 bool retried = false;
503 write_lock(&obd_dev_lock);
504 for (i = 0; i < class_devno_max(); i++) {
505 struct obd_device *obd = class_num2obd(i);
508 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
511 write_unlock(&obd_dev_lock);
513 /* the obd_device could be waited to be
514 * destroyed by the "obd_zombie_impexp_thread".
516 obd_zombie_barrier();
521 CERROR("%s: already exists, won't add\n",
523 /* in case we found a free slot before duplicate */
524 minor_assign = false;
528 if (!minor_assign && obd == NULL) {
535 new_obd->obd_minor = new_obd_minor;
536 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
537 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
538 obd_devs[new_obd_minor] = new_obd;
542 CERROR("%s: all %u/%u devices used, increase "
543 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
544 i, class_devno_max(), ret);
547 write_unlock(&obd_dev_lock);
552 static int class_name2dev_nolock(const char *name)
559 for (i = 0; i < class_devno_max(); i++) {
560 struct obd_device *obd = class_num2obd(i);
562 if (obd && strcmp(name, obd->obd_name) == 0) {
563 /* Make sure we finished attaching before we give
564 out any references */
565 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
566 if (obd->obd_attached) {
576 int class_name2dev(const char *name)
583 read_lock(&obd_dev_lock);
584 i = class_name2dev_nolock(name);
585 read_unlock(&obd_dev_lock);
589 EXPORT_SYMBOL(class_name2dev);
591 struct obd_device *class_name2obd(const char *name)
593 int dev = class_name2dev(name);
595 if (dev < 0 || dev > class_devno_max())
597 return class_num2obd(dev);
599 EXPORT_SYMBOL(class_name2obd);
601 int class_uuid2dev_nolock(struct obd_uuid *uuid)
605 for (i = 0; i < class_devno_max(); i++) {
606 struct obd_device *obd = class_num2obd(i);
608 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
609 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
617 int class_uuid2dev(struct obd_uuid *uuid)
621 read_lock(&obd_dev_lock);
622 i = class_uuid2dev_nolock(uuid);
623 read_unlock(&obd_dev_lock);
627 EXPORT_SYMBOL(class_uuid2dev);
629 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
631 int dev = class_uuid2dev(uuid);
634 return class_num2obd(dev);
636 EXPORT_SYMBOL(class_uuid2obd);
639 * Get obd device from ::obd_devs[]
641 * \param num [in] array index
643 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
644 * otherwise return the obd device there.
646 struct obd_device *class_num2obd(int num)
648 struct obd_device *obd = NULL;
650 if (num < class_devno_max()) {
655 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
656 "%p obd_magic %08x != %08x\n",
657 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
658 LASSERTF(obd->obd_minor == num,
659 "%p obd_minor %0d != %0d\n",
660 obd, obd->obd_minor, num);
665 EXPORT_SYMBOL(class_num2obd);
668 * Find obd in obd_dev[] by name or uuid.
670 * Increment obd's refcount if found.
672 * \param[in] str obd name or uuid
674 * \retval NULL if not found
675 * \retval target pointer to found obd_device
677 struct obd_device *class_dev_by_str(const char *str)
679 struct obd_device *target = NULL;
680 struct obd_uuid tgtuuid;
683 obd_str2uuid(&tgtuuid, str);
685 read_lock(&obd_dev_lock);
686 rc = class_uuid2dev_nolock(&tgtuuid);
688 rc = class_name2dev_nolock(str);
691 target = class_num2obd(rc);
694 class_incref(target, "find", current);
695 read_unlock(&obd_dev_lock);
699 EXPORT_SYMBOL(class_dev_by_str);
702 * Get obd devices count. Device in any
704 * \retval obd device count
706 int get_devices_count(void)
708 int index, max_index = class_devno_max(), dev_count = 0;
710 read_lock(&obd_dev_lock);
711 for (index = 0; index <= max_index; index++) {
712 struct obd_device *obd = class_num2obd(index);
716 read_unlock(&obd_dev_lock);
720 EXPORT_SYMBOL(get_devices_count);
722 void class_obd_list(void)
727 read_lock(&obd_dev_lock);
728 for (i = 0; i < class_devno_max(); i++) {
729 struct obd_device *obd = class_num2obd(i);
733 if (obd->obd_stopping)
735 else if (obd->obd_set_up)
737 else if (obd->obd_attached)
741 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
742 i, status, obd->obd_type->typ_name,
743 obd->obd_name, obd->obd_uuid.uuid,
744 atomic_read(&obd->obd_refcount));
746 read_unlock(&obd_dev_lock);
749 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
750 * specified, then only the client with that uuid is returned,
751 * otherwise any client connected to the tgt is returned.
753 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
754 const char *type_name,
755 struct obd_uuid *grp_uuid)
759 read_lock(&obd_dev_lock);
760 for (i = 0; i < class_devno_max(); i++) {
761 struct obd_device *obd = class_num2obd(i);
765 if ((strncmp(obd->obd_type->typ_name, type_name,
766 strlen(type_name)) == 0)) {
767 if (obd_uuid_equals(tgt_uuid,
768 &obd->u.cli.cl_target_uuid) &&
769 ((grp_uuid)? obd_uuid_equals(grp_uuid,
770 &obd->obd_uuid) : 1)) {
771 read_unlock(&obd_dev_lock);
776 read_unlock(&obd_dev_lock);
780 EXPORT_SYMBOL(class_find_client_obd);
782 /* Iterate the obd_device list looking devices have grp_uuid. Start
783 * searching at *next, and if a device is found, the next index to look
784 * at is saved in *next. If next is NULL, then the first matching device
785 * will always be returned.
787 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
793 else if (*next >= 0 && *next < class_devno_max())
798 read_lock(&obd_dev_lock);
799 for (; i < class_devno_max(); i++) {
800 struct obd_device *obd = class_num2obd(i);
804 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
807 read_unlock(&obd_dev_lock);
811 read_unlock(&obd_dev_lock);
815 EXPORT_SYMBOL(class_devices_in_group);
818 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
819 * adjust sptlrpc settings accordingly.
821 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
823 struct obd_device *obd;
827 LASSERT(namelen > 0);
829 read_lock(&obd_dev_lock);
830 for (i = 0; i < class_devno_max(); i++) {
831 obd = class_num2obd(i);
833 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
836 /* only notify mdc, osc, osp, lwp, mdt, ost
837 * because only these have a -sptlrpc llog */
838 type = obd->obd_type->typ_name;
839 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
840 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
841 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
842 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
843 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
844 strcmp(type, LUSTRE_OST_NAME) != 0)
847 if (strncmp(obd->obd_name, fsname, namelen))
850 class_incref(obd, __FUNCTION__, obd);
851 read_unlock(&obd_dev_lock);
852 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
853 sizeof(KEY_SPTLRPC_CONF),
854 KEY_SPTLRPC_CONF, 0, NULL, NULL);
856 class_decref(obd, __FUNCTION__, obd);
857 read_lock(&obd_dev_lock);
859 read_unlock(&obd_dev_lock);
862 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
864 void obd_cleanup_caches(void)
867 if (obd_device_cachep) {
868 kmem_cache_destroy(obd_device_cachep);
869 obd_device_cachep = NULL;
875 int obd_init_caches(void)
880 LASSERT(obd_device_cachep == NULL);
881 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
882 sizeof(struct obd_device),
883 0, 0, 0, sizeof(struct obd_device), NULL);
884 if (!obd_device_cachep)
885 GOTO(out, rc = -ENOMEM);
889 obd_cleanup_caches();
893 static const char export_handle_owner[] = "export";
895 /* map connection to client */
896 struct obd_export *class_conn2export(struct lustre_handle *conn)
898 struct obd_export *export;
902 CDEBUG(D_CACHE, "looking for null handle\n");
906 if (conn->cookie == -1) { /* this means assign a new connection */
907 CDEBUG(D_CACHE, "want a new connection\n");
911 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
912 export = class_handle2object(conn->cookie, export_handle_owner);
915 EXPORT_SYMBOL(class_conn2export);
917 struct obd_device *class_exp2obd(struct obd_export *exp)
923 EXPORT_SYMBOL(class_exp2obd);
925 struct obd_import *class_exp2cliimp(struct obd_export *exp)
927 struct obd_device *obd = exp->exp_obd;
930 return obd->u.cli.cl_import;
932 EXPORT_SYMBOL(class_exp2cliimp);
934 /* Export management functions */
935 static void class_export_destroy(struct obd_export *exp)
937 struct obd_device *obd = exp->exp_obd;
940 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
941 LASSERT(obd != NULL);
943 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
944 exp->exp_client_uuid.uuid, obd->obd_name);
946 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
947 ptlrpc_connection_put(exp->exp_connection);
949 LASSERT(list_empty(&exp->exp_outstanding_replies));
950 LASSERT(list_empty(&exp->exp_uncommitted_replies));
951 LASSERT(list_empty(&exp->exp_req_replay_queue));
952 LASSERT(list_empty(&exp->exp_hp_rpcs));
953 obd_destroy_export(exp);
954 /* self export doesn't hold a reference to an obd, although it
955 * exists until freeing of the obd */
956 if (exp != obd->obd_self_export)
957 class_decref(obd, "export", exp);
959 OBD_FREE_PRE(exp, sizeof(*exp), "rcu");
960 kfree_rcu(exp, exp_handle.h_rcu);
964 struct obd_export *class_export_get(struct obd_export *exp)
966 refcount_inc(&exp->exp_handle.h_ref);
967 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
968 refcount_read(&exp->exp_handle.h_ref));
971 EXPORT_SYMBOL(class_export_get);
973 void class_export_put(struct obd_export *exp)
975 LASSERT(exp != NULL);
976 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
977 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
978 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
979 refcount_read(&exp->exp_handle.h_ref) - 1);
981 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
982 struct obd_device *obd = exp->exp_obd;
984 CDEBUG(D_IOCTL, "final put %p/%s\n",
985 exp, exp->exp_client_uuid.uuid);
987 /* release nid stat refererence */
988 lprocfs_exp_cleanup(exp);
990 if (exp == obd->obd_self_export) {
991 /* self export should be destroyed without
992 * zombie thread as it doesn't hold a
993 * reference to obd and doesn't hold any
995 class_export_destroy(exp);
996 /* self export is destroyed, no class
997 * references exist and it is safe to free
1001 LASSERT(!list_empty(&exp->exp_obd_chain));
1002 obd_zombie_export_add(exp);
1007 EXPORT_SYMBOL(class_export_put);
1009 static void obd_zombie_exp_cull(struct work_struct *ws)
1011 struct obd_export *export;
1013 export = container_of(ws, struct obd_export, exp_zombie_work);
1014 class_export_destroy(export);
1017 /* Creates a new export, adds it to the hash table, and returns a
1018 * pointer to it. The refcount is 2: one for the hash reference, and
1019 * one for the pointer returned by this function. */
1020 struct obd_export *__class_new_export(struct obd_device *obd,
1021 struct obd_uuid *cluuid, bool is_self)
1023 struct obd_export *export;
1027 OBD_ALLOC_PTR(export);
1029 return ERR_PTR(-ENOMEM);
1031 export->exp_conn_cnt = 0;
1032 export->exp_lock_hash = NULL;
1033 export->exp_flock_hash = NULL;
1034 /* 2 = class_handle_hash + last */
1035 refcount_set(&export->exp_handle.h_ref, 2);
1036 atomic_set(&export->exp_rpc_count, 0);
1037 atomic_set(&export->exp_cb_count, 0);
1038 atomic_set(&export->exp_locks_count, 0);
1039 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1040 INIT_LIST_HEAD(&export->exp_locks_list);
1041 spin_lock_init(&export->exp_locks_list_guard);
1043 atomic_set(&export->exp_replay_count, 0);
1044 export->exp_obd = obd;
1045 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1046 spin_lock_init(&export->exp_uncommitted_replies_lock);
1047 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1048 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1049 INIT_HLIST_NODE(&export->exp_handle.h_link);
1050 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1051 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1052 class_handle_hash(&export->exp_handle, export_handle_owner);
1053 export->exp_last_request_time = ktime_get_real_seconds();
1054 spin_lock_init(&export->exp_lock);
1055 spin_lock_init(&export->exp_rpc_lock);
1056 INIT_HLIST_NODE(&export->exp_gen_hash);
1057 spin_lock_init(&export->exp_bl_list_lock);
1058 INIT_LIST_HEAD(&export->exp_bl_list);
1059 INIT_LIST_HEAD(&export->exp_stale_list);
1060 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1062 export->exp_sp_peer = LUSTRE_SP_ANY;
1063 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1064 export->exp_client_uuid = *cluuid;
1065 obd_init_export(export);
1067 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1069 spin_lock(&obd->obd_dev_lock);
1070 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1071 /* shouldn't happen, but might race */
1072 if (obd->obd_stopping)
1073 GOTO(exit_unlock, rc = -ENODEV);
1075 rc = obd_uuid_add(obd, export);
1077 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1078 obd->obd_name, cluuid->uuid, rc);
1079 GOTO(exit_unlock, rc = -EALREADY);
1084 class_incref(obd, "export", export);
1085 list_add_tail(&export->exp_obd_chain_timed,
1086 &obd->obd_exports_timed);
1087 list_add(&export->exp_obd_chain, &obd->obd_exports);
1088 obd->obd_num_exports++;
1090 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1091 INIT_LIST_HEAD(&export->exp_obd_chain);
1093 spin_unlock(&obd->obd_dev_lock);
1097 spin_unlock(&obd->obd_dev_lock);
1098 class_handle_unhash(&export->exp_handle);
1099 obd_destroy_export(export);
1100 OBD_FREE_PTR(export);
1104 struct obd_export *class_new_export(struct obd_device *obd,
1105 struct obd_uuid *uuid)
1107 return __class_new_export(obd, uuid, false);
1109 EXPORT_SYMBOL(class_new_export);
1111 struct obd_export *class_new_export_self(struct obd_device *obd,
1112 struct obd_uuid *uuid)
1114 return __class_new_export(obd, uuid, true);
1117 void class_unlink_export(struct obd_export *exp)
1119 class_handle_unhash(&exp->exp_handle);
1121 if (exp->exp_obd->obd_self_export == exp) {
1122 class_export_put(exp);
1126 spin_lock(&exp->exp_obd->obd_dev_lock);
1127 /* delete an uuid-export hashitem from hashtables */
1128 if (exp != exp->exp_obd->obd_self_export)
1129 obd_uuid_del(exp->exp_obd, exp);
1131 #ifdef HAVE_SERVER_SUPPORT
1132 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1133 struct tg_export_data *ted = &exp->exp_target_data;
1134 struct cfs_hash *hash;
1136 /* Because obd_gen_hash will not be released until
1137 * class_cleanup(), so hash should never be NULL here */
1138 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1139 LASSERT(hash != NULL);
1140 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1141 &exp->exp_gen_hash);
1142 cfs_hash_putref(hash);
1144 #endif /* HAVE_SERVER_SUPPORT */
1146 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1147 list_del_init(&exp->exp_obd_chain_timed);
1148 exp->exp_obd->obd_num_exports--;
1149 spin_unlock(&exp->exp_obd->obd_dev_lock);
1150 atomic_inc(&obd_stale_export_num);
1152 /* A reference is kept by obd_stale_exports list */
1153 obd_stale_export_put(exp);
1155 EXPORT_SYMBOL(class_unlink_export);
1157 /* Import management functions */
1158 static void obd_zombie_import_free(struct obd_import *imp)
1162 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1163 imp->imp_obd->obd_name);
1165 LASSERT(refcount_read(&imp->imp_refcount) == 0);
1167 ptlrpc_connection_put(imp->imp_connection);
1169 while (!list_empty(&imp->imp_conn_list)) {
1170 struct obd_import_conn *imp_conn;
1172 imp_conn = list_first_entry(&imp->imp_conn_list,
1173 struct obd_import_conn, oic_item);
1174 list_del_init(&imp_conn->oic_item);
1175 ptlrpc_connection_put(imp_conn->oic_conn);
1176 OBD_FREE(imp_conn, sizeof(*imp_conn));
1179 LASSERT(imp->imp_sec == NULL);
1180 LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1181 imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1182 class_decref(imp->imp_obd, "import", imp);
1187 struct obd_import *class_import_get(struct obd_import *import)
1189 refcount_inc(&import->imp_refcount);
1190 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1191 refcount_read(&import->imp_refcount),
1192 import->imp_obd->obd_name);
1195 EXPORT_SYMBOL(class_import_get);
1197 void class_import_put(struct obd_import *imp)
1201 LASSERT(refcount_read(&imp->imp_refcount) > 0);
1203 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1204 refcount_read(&imp->imp_refcount) - 1,
1205 imp->imp_obd->obd_name);
1207 if (refcount_dec_and_test(&imp->imp_refcount)) {
1208 CDEBUG(D_INFO, "final put import %p\n", imp);
1209 obd_zombie_import_add(imp);
1214 EXPORT_SYMBOL(class_import_put);
1216 static void init_imp_at(struct imp_at *at) {
1218 at_init(&at->iat_net_latency, 0, 0);
1219 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1220 /* max service estimates are tracked on the server side, so
1221 don't use the AT history here, just use the last reported
1222 val. (But keep hist for proc histogram, worst_ever) */
1223 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1228 static void obd_zombie_imp_cull(struct work_struct *ws)
1230 struct obd_import *import;
1232 import = container_of(ws, struct obd_import, imp_zombie_work);
1233 obd_zombie_import_free(import);
1236 struct obd_import *class_new_import(struct obd_device *obd)
1238 struct obd_import *imp;
1239 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1241 OBD_ALLOC(imp, sizeof(*imp));
1245 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1246 INIT_LIST_HEAD(&imp->imp_replay_list);
1247 INIT_LIST_HEAD(&imp->imp_sending_list);
1248 INIT_LIST_HEAD(&imp->imp_delayed_list);
1249 INIT_LIST_HEAD(&imp->imp_committed_list);
1250 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1251 imp->imp_known_replied_xid = 0;
1252 imp->imp_replay_cursor = &imp->imp_committed_list;
1253 spin_lock_init(&imp->imp_lock);
1254 imp->imp_last_success_conn = 0;
1255 imp->imp_state = LUSTRE_IMP_NEW;
1256 imp->imp_obd = class_incref(obd, "import", imp);
1257 rwlock_init(&imp->imp_sec_lock);
1258 init_waitqueue_head(&imp->imp_recovery_waitq);
1259 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1261 if (curr_pid_ns && curr_pid_ns->child_reaper)
1262 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1264 imp->imp_sec_refpid = 1;
1266 refcount_set(&imp->imp_refcount, 2);
1267 atomic_set(&imp->imp_unregistering, 0);
1268 atomic_set(&imp->imp_reqs, 0);
1269 atomic_set(&imp->imp_inflight, 0);
1270 atomic_set(&imp->imp_replay_inflight, 0);
1271 init_waitqueue_head(&imp->imp_replay_waitq);
1272 atomic_set(&imp->imp_inval_count, 0);
1273 INIT_LIST_HEAD(&imp->imp_conn_list);
1274 init_imp_at(&imp->imp_at);
1276 /* the default magic is V2, will be used in connect RPC, and
1277 * then adjusted according to the flags in request/reply. */
1278 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1282 EXPORT_SYMBOL(class_new_import);
1284 void class_destroy_import(struct obd_import *import)
1286 LASSERT(import != NULL);
1287 LASSERT(import != LP_POISON);
1289 spin_lock(&import->imp_lock);
1290 import->imp_generation++;
1291 spin_unlock(&import->imp_lock);
1292 class_import_put(import);
1294 EXPORT_SYMBOL(class_destroy_import);
1296 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1298 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1300 spin_lock(&exp->exp_locks_list_guard);
1302 LASSERT(lock->l_exp_refs_nr >= 0);
1304 if (lock->l_exp_refs_target != NULL &&
1305 lock->l_exp_refs_target != exp) {
1306 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1307 exp, lock, lock->l_exp_refs_target);
1309 if ((lock->l_exp_refs_nr ++) == 0) {
1310 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1311 lock->l_exp_refs_target = exp;
1313 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1314 lock, exp, lock->l_exp_refs_nr);
1315 spin_unlock(&exp->exp_locks_list_guard);
1317 EXPORT_SYMBOL(__class_export_add_lock_ref);
1319 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1321 spin_lock(&exp->exp_locks_list_guard);
1322 LASSERT(lock->l_exp_refs_nr > 0);
1323 if (lock->l_exp_refs_target != exp) {
1324 LCONSOLE_WARN("lock %p, "
1325 "mismatching export pointers: %p, %p\n",
1326 lock, lock->l_exp_refs_target, exp);
1328 if (-- lock->l_exp_refs_nr == 0) {
1329 list_del_init(&lock->l_exp_refs_link);
1330 lock->l_exp_refs_target = NULL;
1332 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1333 lock, exp, lock->l_exp_refs_nr);
1334 spin_unlock(&exp->exp_locks_list_guard);
1336 EXPORT_SYMBOL(__class_export_del_lock_ref);
1339 /* A connection defines an export context in which preallocation can
1340 be managed. This releases the export pointer reference, and returns
1341 the export handle, so the export refcount is 1 when this function
1343 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1344 struct obd_uuid *cluuid)
1346 struct obd_export *export;
1347 LASSERT(conn != NULL);
1348 LASSERT(obd != NULL);
1349 LASSERT(cluuid != NULL);
1352 export = class_new_export(obd, cluuid);
1354 RETURN(PTR_ERR(export));
1356 conn->cookie = export->exp_handle.h_cookie;
1357 class_export_put(export);
1359 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1360 cluuid->uuid, conn->cookie);
1363 EXPORT_SYMBOL(class_connect);
1365 /* if export is involved in recovery then clean up related things */
1366 static void class_export_recovery_cleanup(struct obd_export *exp)
1368 struct obd_device *obd = exp->exp_obd;
1370 spin_lock(&obd->obd_recovery_task_lock);
1371 if (obd->obd_recovering) {
1372 if (exp->exp_in_recovery) {
1373 spin_lock(&exp->exp_lock);
1374 exp->exp_in_recovery = 0;
1375 spin_unlock(&exp->exp_lock);
1376 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1377 atomic_dec(&obd->obd_connected_clients);
1380 /* if called during recovery then should update
1381 * obd_stale_clients counter,
1382 * lightweight exports are not counted */
1383 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1384 exp->exp_obd->obd_stale_clients++;
1386 spin_unlock(&obd->obd_recovery_task_lock);
1388 spin_lock(&exp->exp_lock);
1389 /** Cleanup req replay fields */
1390 if (exp->exp_req_replay_needed) {
1391 exp->exp_req_replay_needed = 0;
1393 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1394 atomic_dec(&obd->obd_req_replay_clients);
1397 /** Cleanup lock replay data */
1398 if (exp->exp_lock_replay_needed) {
1399 exp->exp_lock_replay_needed = 0;
1401 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1402 atomic_dec(&obd->obd_lock_replay_clients);
1404 spin_unlock(&exp->exp_lock);
1407 /* This function removes 1-3 references from the export:
1408 * 1 - for export pointer passed
1409 * and if disconnect really need
1410 * 2 - removing from hash
1411 * 3 - in client_unlink_export
1412 * The export pointer passed to this function can destroyed */
1413 int class_disconnect(struct obd_export *export)
1415 int already_disconnected;
1418 if (export == NULL) {
1419 CWARN("attempting to free NULL export %p\n", export);
1423 spin_lock(&export->exp_lock);
1424 already_disconnected = export->exp_disconnected;
1425 export->exp_disconnected = 1;
1426 #ifdef HAVE_SERVER_SUPPORT
1427 /* We hold references of export for uuid hash
1428 * and nid_hash and export link at least. So
1429 * it is safe to call rh*table_remove_fast in
1432 obd_nid_del(export->exp_obd, export);
1433 #endif /* HAVE_SERVER_SUPPORT */
1434 spin_unlock(&export->exp_lock);
1436 /* class_cleanup(), abort_recovery(), and class_fail_export()
1437 * all end up in here, and if any of them race we shouldn't
1438 * call extra class_export_puts(). */
1439 if (already_disconnected)
1440 GOTO(no_disconn, already_disconnected);
1442 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1443 export->exp_handle.h_cookie);
1445 class_export_recovery_cleanup(export);
1446 class_unlink_export(export);
1448 class_export_put(export);
1451 EXPORT_SYMBOL(class_disconnect);
1453 /* Return non-zero for a fully connected export */
1454 int class_connected_export(struct obd_export *exp)
1459 spin_lock(&exp->exp_lock);
1460 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1461 spin_unlock(&exp->exp_lock);
1465 EXPORT_SYMBOL(class_connected_export);
1467 static void class_disconnect_export_list(struct list_head *list,
1468 enum obd_option flags)
1471 struct obd_export *exp;
1474 /* It's possible that an export may disconnect itself, but
1475 * nothing else will be added to this list. */
1476 while (!list_empty(list)) {
1477 exp = list_first_entry(list, struct obd_export,
1479 /* need for safe call CDEBUG after obd_disconnect */
1480 class_export_get(exp);
1482 spin_lock(&exp->exp_lock);
1483 exp->exp_flags = flags;
1484 spin_unlock(&exp->exp_lock);
1486 if (obd_uuid_equals(&exp->exp_client_uuid,
1487 &exp->exp_obd->obd_uuid)) {
1489 "exp %p export uuid == obd uuid, don't discon\n",
1491 /* Need to delete this now so we don't end up pointing
1492 * to work_list later when this export is cleaned up. */
1493 list_del_init(&exp->exp_obd_chain);
1494 class_export_put(exp);
1498 class_export_get(exp);
1499 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1500 "last request at %lld\n",
1501 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1502 exp, exp->exp_last_request_time);
1503 /* release one export reference anyway */
1504 rc = obd_disconnect(exp);
1506 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1507 obd_export_nid2str(exp), exp, rc);
1508 class_export_put(exp);
1513 void class_disconnect_exports(struct obd_device *obd)
1515 LIST_HEAD(work_list);
1518 /* Move all of the exports from obd_exports to a work list, en masse. */
1519 spin_lock(&obd->obd_dev_lock);
1520 list_splice_init(&obd->obd_exports, &work_list);
1521 list_splice_init(&obd->obd_delayed_exports, &work_list);
1522 spin_unlock(&obd->obd_dev_lock);
1524 if (!list_empty(&work_list)) {
1525 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1526 "disconnecting them\n", obd->obd_minor, obd);
1527 class_disconnect_export_list(&work_list,
1528 exp_flags_from_obd(obd));
1530 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1531 obd->obd_minor, obd);
1534 EXPORT_SYMBOL(class_disconnect_exports);
1536 /* Remove exports that have not completed recovery.
1538 void class_disconnect_stale_exports(struct obd_device *obd,
1539 int (*test_export)(struct obd_export *))
1541 LIST_HEAD(work_list);
1542 struct obd_export *exp, *n;
1546 spin_lock(&obd->obd_dev_lock);
1547 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1549 /* don't count self-export as client */
1550 if (obd_uuid_equals(&exp->exp_client_uuid,
1551 &exp->exp_obd->obd_uuid))
1554 /* don't evict clients which have no slot in last_rcvd
1555 * (e.g. lightweight connection) */
1556 if (exp->exp_target_data.ted_lr_idx == -1)
1559 spin_lock(&exp->exp_lock);
1560 if (exp->exp_failed || test_export(exp)) {
1561 spin_unlock(&exp->exp_lock);
1564 exp->exp_failed = 1;
1565 spin_unlock(&exp->exp_lock);
1567 list_move(&exp->exp_obd_chain, &work_list);
1569 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1570 obd->obd_name, exp->exp_client_uuid.uuid,
1571 obd_export_nid2str(exp));
1572 print_export_data(exp, "EVICTING", 0, D_HA);
1574 spin_unlock(&obd->obd_dev_lock);
1577 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1578 obd->obd_name, evicted);
1580 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1581 OBD_OPT_ABORT_RECOV);
1584 EXPORT_SYMBOL(class_disconnect_stale_exports);
1586 void class_fail_export(struct obd_export *exp)
1588 int rc, already_failed;
1590 spin_lock(&exp->exp_lock);
1591 already_failed = exp->exp_failed;
1592 exp->exp_failed = 1;
1593 spin_unlock(&exp->exp_lock);
1595 if (already_failed) {
1596 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1597 exp, exp->exp_client_uuid.uuid);
1601 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1602 exp, exp->exp_client_uuid.uuid);
1604 if (obd_dump_on_timeout)
1605 libcfs_debug_dumplog();
1607 /* need for safe call CDEBUG after obd_disconnect */
1608 class_export_get(exp);
1610 /* Most callers into obd_disconnect are removing their own reference
1611 * (request, for example) in addition to the one from the hash table.
1612 * We don't have such a reference here, so make one. */
1613 class_export_get(exp);
1614 rc = obd_disconnect(exp);
1616 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1618 CDEBUG(D_HA, "disconnected export %p/%s\n",
1619 exp, exp->exp_client_uuid.uuid);
1620 class_export_put(exp);
1622 EXPORT_SYMBOL(class_fail_export);
1624 #ifdef HAVE_SERVER_SUPPORT
1626 static int take_first(struct obd_export *exp, void *data)
1628 struct obd_export **expp = data;
1631 /* already have one */
1633 if (exp->exp_failed)
1634 /* Don't want this one */
1636 if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1637 /* Cannot get a ref on this one */
1643 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1645 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1646 struct obd_export *doomed_exp;
1647 int exports_evicted = 0;
1649 spin_lock(&obd->obd_dev_lock);
1650 /* umount has run already, so evict thread should leave
1651 * its task to umount thread now */
1652 if (obd->obd_stopping) {
1653 spin_unlock(&obd->obd_dev_lock);
1654 return exports_evicted;
1656 spin_unlock(&obd->obd_dev_lock);
1659 while (obd_nid_export_for_each(obd, nid_key,
1660 take_first, &doomed_exp) > 0) {
1662 LASSERTF(doomed_exp != obd->obd_self_export,
1663 "self-export is hashed by NID?\n");
1665 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1667 obd_uuid2str(&doomed_exp->exp_client_uuid),
1668 obd_export_nid2str(doomed_exp));
1670 class_fail_export(doomed_exp);
1671 class_export_put(doomed_exp);
1676 if (!exports_evicted)
1678 "%s: can't disconnect NID '%s': no exports found\n",
1679 obd->obd_name, nid);
1680 return exports_evicted;
1682 EXPORT_SYMBOL(obd_export_evict_by_nid);
1684 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1686 struct obd_export *doomed_exp = NULL;
1687 struct obd_uuid doomed_uuid;
1688 int exports_evicted = 0;
1690 spin_lock(&obd->obd_dev_lock);
1691 if (obd->obd_stopping) {
1692 spin_unlock(&obd->obd_dev_lock);
1693 return exports_evicted;
1695 spin_unlock(&obd->obd_dev_lock);
1697 obd_str2uuid(&doomed_uuid, uuid);
1698 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1699 CERROR("%s: can't evict myself\n", obd->obd_name);
1700 return exports_evicted;
1703 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1704 if (doomed_exp == NULL) {
1705 CERROR("%s: can't disconnect %s: no exports found\n",
1706 obd->obd_name, uuid);
1708 CWARN("%s: evicting %s at adminstrative request\n",
1709 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1710 class_fail_export(doomed_exp);
1711 class_export_put(doomed_exp);
1712 obd_uuid_del(obd, doomed_exp);
1716 return exports_evicted;
1718 #endif /* HAVE_SERVER_SUPPORT */
1720 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1721 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1722 EXPORT_SYMBOL(class_export_dump_hook);
1725 static void print_export_data(struct obd_export *exp, const char *status,
1726 int locks, int debug_level)
1728 struct ptlrpc_reply_state *rs;
1729 struct ptlrpc_reply_state *first_reply = NULL;
1732 spin_lock(&exp->exp_lock);
1733 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1739 spin_unlock(&exp->exp_lock);
1741 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1742 "%p %s %llu stale:%d\n",
1743 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1744 obd_export_nid2str(exp),
1745 refcount_read(&exp->exp_handle.h_ref),
1746 atomic_read(&exp->exp_rpc_count),
1747 atomic_read(&exp->exp_cb_count),
1748 atomic_read(&exp->exp_locks_count),
1749 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1750 nreplies, first_reply, nreplies > 3 ? "..." : "",
1751 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1752 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1753 if (locks && class_export_dump_hook != NULL)
1754 class_export_dump_hook(exp);
1758 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1760 struct obd_export *exp;
1762 spin_lock(&obd->obd_dev_lock);
1763 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1764 print_export_data(exp, "ACTIVE", locks, debug_level);
1765 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1766 print_export_data(exp, "UNLINKED", locks, debug_level);
1767 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1768 print_export_data(exp, "DELAYED", locks, debug_level);
1769 spin_unlock(&obd->obd_dev_lock);
1772 void obd_exports_barrier(struct obd_device *obd)
1775 LASSERT(list_empty(&obd->obd_exports));
1776 spin_lock(&obd->obd_dev_lock);
1777 while (!list_empty(&obd->obd_unlinked_exports)) {
1778 spin_unlock(&obd->obd_dev_lock);
1779 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1780 if (waited > 5 && is_power_of_2(waited)) {
1781 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1782 "more than %d seconds. "
1783 "The obd refcount = %d. Is it stuck?\n",
1784 obd->obd_name, waited,
1785 atomic_read(&obd->obd_refcount));
1786 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1789 spin_lock(&obd->obd_dev_lock);
1791 spin_unlock(&obd->obd_dev_lock);
1793 EXPORT_SYMBOL(obd_exports_barrier);
1796 * Add export to the obd_zombe thread and notify it.
1798 static void obd_zombie_export_add(struct obd_export *exp) {
1799 atomic_dec(&obd_stale_export_num);
1800 spin_lock(&exp->exp_obd->obd_dev_lock);
1801 LASSERT(!list_empty(&exp->exp_obd_chain));
1802 list_del_init(&exp->exp_obd_chain);
1803 spin_unlock(&exp->exp_obd->obd_dev_lock);
1805 queue_work(zombie_wq, &exp->exp_zombie_work);
1809 * Add import to the obd_zombe thread and notify it.
1811 static void obd_zombie_import_add(struct obd_import *imp) {
1812 LASSERT(imp->imp_sec == NULL);
1814 queue_work(zombie_wq, &imp->imp_zombie_work);
1818 * wait when obd_zombie import/export queues become empty
1820 void obd_zombie_barrier(void)
1822 flush_workqueue(zombie_wq);
1824 EXPORT_SYMBOL(obd_zombie_barrier);
1827 struct obd_export *obd_stale_export_get(void)
1829 struct obd_export *exp = NULL;
1832 spin_lock(&obd_stale_export_lock);
1833 if (!list_empty(&obd_stale_exports)) {
1834 exp = list_first_entry(&obd_stale_exports,
1835 struct obd_export, exp_stale_list);
1836 list_del_init(&exp->exp_stale_list);
1838 spin_unlock(&obd_stale_export_lock);
1841 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1842 atomic_read(&obd_stale_export_num));
1846 EXPORT_SYMBOL(obd_stale_export_get);
1848 void obd_stale_export_put(struct obd_export *exp)
1852 LASSERT(list_empty(&exp->exp_stale_list));
1853 if (exp->exp_lock_hash &&
1854 atomic_read(&exp->exp_lock_hash->hs_count)) {
1855 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1856 atomic_read(&obd_stale_export_num));
1858 spin_lock_bh(&exp->exp_bl_list_lock);
1859 spin_lock(&obd_stale_export_lock);
1860 /* Add to the tail if there is no blocked locks,
1861 * to the head otherwise. */
1862 if (list_empty(&exp->exp_bl_list))
1863 list_add_tail(&exp->exp_stale_list,
1864 &obd_stale_exports);
1866 list_add(&exp->exp_stale_list,
1867 &obd_stale_exports);
1869 spin_unlock(&obd_stale_export_lock);
1870 spin_unlock_bh(&exp->exp_bl_list_lock);
1872 class_export_put(exp);
1876 EXPORT_SYMBOL(obd_stale_export_put);
1879 * Adjust the position of the export in the stale list,
1880 * i.e. move to the head of the list if is needed.
1882 void obd_stale_export_adjust(struct obd_export *exp)
1884 LASSERT(exp != NULL);
1885 spin_lock_bh(&exp->exp_bl_list_lock);
1886 spin_lock(&obd_stale_export_lock);
1888 if (!list_empty(&exp->exp_stale_list) &&
1889 !list_empty(&exp->exp_bl_list))
1890 list_move(&exp->exp_stale_list, &obd_stale_exports);
1892 spin_unlock(&obd_stale_export_lock);
1893 spin_unlock_bh(&exp->exp_bl_list_lock);
1895 EXPORT_SYMBOL(obd_stale_export_adjust);
1898 * start destroy zombie import/export thread
1900 int obd_zombie_impexp_init(void)
1902 zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1904 cfs_cpt_number(cfs_cpt_tab));
1906 return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1910 * stop destroy zombie import/export thread
1912 void obd_zombie_impexp_stop(void)
1914 destroy_workqueue(zombie_wq);
1915 LASSERT(list_empty(&obd_stale_exports));
1918 /***** Kernel-userspace comm helpers *******/
1920 /* Get length of entire message, including header */
1921 int kuc_len(int payload_len)
1923 return sizeof(struct kuc_hdr) + payload_len;
1925 EXPORT_SYMBOL(kuc_len);
1927 /* Get a pointer to kuc header, given a ptr to the payload
1928 * @param p Pointer to payload area
1929 * @returns Pointer to kuc header
1931 struct kuc_hdr * kuc_ptr(void *p)
1933 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1934 LASSERT(lh->kuc_magic == KUC_MAGIC);
1937 EXPORT_SYMBOL(kuc_ptr);
1939 /* Alloc space for a message, and fill in header
1940 * @return Pointer to payload area
1942 void *kuc_alloc(int payload_len, int transport, int type)
1945 int len = kuc_len(payload_len);
1949 return ERR_PTR(-ENOMEM);
1951 lh->kuc_magic = KUC_MAGIC;
1952 lh->kuc_transport = transport;
1953 lh->kuc_msgtype = type;
1954 lh->kuc_msglen = len;
1956 return (void *)(lh + 1);
1958 EXPORT_SYMBOL(kuc_alloc);
1960 /* Takes pointer to payload area */
1961 void kuc_free(void *p, int payload_len)
1963 struct kuc_hdr *lh = kuc_ptr(p);
1964 OBD_FREE(lh, kuc_len(payload_len));
1966 EXPORT_SYMBOL(kuc_free);
1968 struct obd_request_slot_waiter {
1969 struct list_head orsw_entry;
1970 wait_queue_head_t orsw_waitq;
1974 static bool obd_request_slot_avail(struct client_obd *cli,
1975 struct obd_request_slot_waiter *orsw)
1979 spin_lock(&cli->cl_loi_list_lock);
1980 avail = !!list_empty(&orsw->orsw_entry);
1981 spin_unlock(&cli->cl_loi_list_lock);
1987 * For network flow control, the RPC sponsor needs to acquire a credit
1988 * before sending the RPC. The credits count for a connection is defined
1989 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1990 * the subsequent RPC sponsors need to wait until others released their
1991 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1993 int obd_get_request_slot(struct client_obd *cli)
1995 struct obd_request_slot_waiter orsw;
1998 spin_lock(&cli->cl_loi_list_lock);
1999 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2000 cli->cl_rpcs_in_flight++;
2001 spin_unlock(&cli->cl_loi_list_lock);
2005 init_waitqueue_head(&orsw.orsw_waitq);
2006 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2007 orsw.orsw_signaled = false;
2008 spin_unlock(&cli->cl_loi_list_lock);
2010 rc = l_wait_event_abortable(orsw.orsw_waitq,
2011 obd_request_slot_avail(cli, &orsw) ||
2012 orsw.orsw_signaled);
2014 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2015 * freed but other (such as obd_put_request_slot) is using it. */
2016 spin_lock(&cli->cl_loi_list_lock);
2018 if (!orsw.orsw_signaled) {
2019 if (list_empty(&orsw.orsw_entry))
2020 cli->cl_rpcs_in_flight--;
2022 list_del(&orsw.orsw_entry);
2027 if (orsw.orsw_signaled) {
2028 LASSERT(list_empty(&orsw.orsw_entry));
2032 spin_unlock(&cli->cl_loi_list_lock);
2036 EXPORT_SYMBOL(obd_get_request_slot);
2038 void obd_put_request_slot(struct client_obd *cli)
2040 struct obd_request_slot_waiter *orsw;
2042 spin_lock(&cli->cl_loi_list_lock);
2043 cli->cl_rpcs_in_flight--;
2045 /* If there is free slot, wakeup the first waiter. */
2046 if (!list_empty(&cli->cl_flight_waiters) &&
2047 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2048 orsw = list_first_entry(&cli->cl_flight_waiters,
2049 struct obd_request_slot_waiter,
2051 list_del_init(&orsw->orsw_entry);
2052 cli->cl_rpcs_in_flight++;
2053 wake_up(&orsw->orsw_waitq);
2055 spin_unlock(&cli->cl_loi_list_lock);
2057 EXPORT_SYMBOL(obd_put_request_slot);
2059 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2061 return cli->cl_max_rpcs_in_flight;
2063 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2065 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2067 struct obd_request_slot_waiter *orsw;
2073 if (max > OBD_MAX_RIF_MAX || max < 1)
2076 CDEBUG(D_INFO, "%s: max = %hu max_mod = %u rif = %u\n",
2077 cli->cl_import->imp_obd->obd_name, max,
2078 cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2080 if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2081 LUSTRE_MDC_NAME) == 0) {
2082 /* adjust max_mod_rpcs_in_flight to ensure it is always
2083 * strictly lower that max_rpcs_in_flight */
2085 CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2086 cli->cl_import->imp_obd->obd_name);
2089 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2090 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2096 spin_lock(&cli->cl_loi_list_lock);
2097 old = cli->cl_max_rpcs_in_flight;
2098 cli->cl_max_rpcs_in_flight = max;
2099 client_adjust_max_dirty(cli);
2103 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2104 for (i = 0; i < diff; i++) {
2105 if (list_empty(&cli->cl_flight_waiters))
2108 orsw = list_first_entry(&cli->cl_flight_waiters,
2109 struct obd_request_slot_waiter,
2111 list_del_init(&orsw->orsw_entry);
2112 cli->cl_rpcs_in_flight++;
2113 wake_up(&orsw->orsw_waitq);
2115 spin_unlock(&cli->cl_loi_list_lock);
2119 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2121 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2123 return cli->cl_max_mod_rpcs_in_flight;
2125 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2127 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2129 struct obd_connect_data *ocd;
2133 if (max > OBD_MAX_RIF_MAX || max < 1)
2136 ocd = &cli->cl_import->imp_connect_data;
2137 CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2138 cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2139 ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2141 if (max == OBD_MAX_RIF_MAX)
2142 max = OBD_MAX_RIF_MAX - 1;
2144 /* Cannot exceed or equal max_rpcs_in_flight. If we are asked to
2145 * increase this value, also bump up max_rpcs_in_flight to match.
2147 if (max >= cli->cl_max_rpcs_in_flight) {
2149 "%s: increasing max_rpcs_in_flight=%hu to allow larger max_mod_rpcs_in_flight=%u\n",
2150 cli->cl_import->imp_obd->obd_name, max + 1, max);
2151 obd_set_max_rpcs_in_flight(cli, max + 1);
2154 /* cannot exceed max modify RPCs in flight supported by the server,
2155 * but verify ocd_connect_flags is at least initialized first. If
2156 * not, allow it and fix value later in ptlrpc_connect_set_flags().
2158 if (!ocd->ocd_connect_flags) {
2159 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2160 } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2161 maxmodrpcs = ocd->ocd_maxmodrpcs;
2162 if (maxmodrpcs == 0) { /* connection not finished yet */
2163 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2165 "%s: partial connect, assume maxmodrpcs=%hu\n",
2166 cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2171 if (max > maxmodrpcs) {
2172 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than ocd_maxmodrpcs=%hu returned by the server at connection\n",
2173 cli->cl_import->imp_obd->obd_name,
2178 spin_lock(&cli->cl_mod_rpcs_lock);
2180 prev = cli->cl_max_mod_rpcs_in_flight;
2181 cli->cl_max_mod_rpcs_in_flight = max;
2183 /* wakeup waiters if limit has been increased */
2184 if (cli->cl_max_mod_rpcs_in_flight > prev)
2185 wake_up(&cli->cl_mod_rpcs_waitq);
2187 spin_unlock(&cli->cl_mod_rpcs_lock);
2191 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2193 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2194 struct seq_file *seq)
2196 unsigned long mod_tot = 0, mod_cum;
2197 struct timespec64 now;
2200 ktime_get_real_ts64(&now);
2202 spin_lock(&cli->cl_mod_rpcs_lock);
2204 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2205 (s64)now.tv_sec, now.tv_nsec);
2206 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2207 cli->cl_mod_rpcs_in_flight);
2209 seq_printf(seq, "\n\t\t\tmodify\n");
2210 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2212 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2215 for (i = 0; i < OBD_HIST_MAX; i++) {
2216 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2218 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2219 i, mod, pct(mod, mod_tot),
2220 pct(mod_cum, mod_tot));
2221 if (mod_cum == mod_tot)
2225 spin_unlock(&cli->cl_mod_rpcs_lock);
2229 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2231 /* The number of modify RPCs sent in parallel is limited
2232 * because the server has a finite number of slots per client to
2233 * store request result and ensure reply reconstruction when needed.
2234 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2235 * that takes into account server limit and cl_max_rpcs_in_flight
2237 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2238 * one close request is allowed above the maximum.
2240 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2245 /* A slot is available if
2246 * - number of modify RPCs in flight is less than the max
2247 * - it's a close RPC and no other close request is in flight
2249 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2250 (close_req && cli->cl_close_rpcs_in_flight == 0);
2255 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2260 spin_lock(&cli->cl_mod_rpcs_lock);
2261 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2262 spin_unlock(&cli->cl_mod_rpcs_lock);
2267 /* Get a modify RPC slot from the obd client @cli according
2268 * to the kind of operation @opc that is going to be sent
2269 * and the intent @it of the operation if it applies.
2270 * If the maximum number of modify RPCs in flight is reached
2271 * the thread is put to sleep.
2272 * Returns the tag to be set in the request message. Tag 0
2273 * is reserved for non-modifying requests.
2275 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2277 bool close_req = false;
2280 if (opc == MDS_CLOSE)
2284 spin_lock(&cli->cl_mod_rpcs_lock);
2285 max = cli->cl_max_mod_rpcs_in_flight;
2286 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2287 /* there is a slot available */
2288 cli->cl_mod_rpcs_in_flight++;
2290 cli->cl_close_rpcs_in_flight++;
2291 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2292 cli->cl_mod_rpcs_in_flight);
2293 /* find a free tag */
2294 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2296 LASSERT(i < OBD_MAX_RIF_MAX);
2297 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2298 spin_unlock(&cli->cl_mod_rpcs_lock);
2299 /* tag 0 is reserved for non-modify RPCs */
2302 "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2303 cli->cl_import->imp_obd->obd_name,
2308 spin_unlock(&cli->cl_mod_rpcs_lock);
2310 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2311 "opc %u, max %hu\n",
2312 cli->cl_import->imp_obd->obd_name, opc, max);
2314 wait_event_idle_exclusive(cli->cl_mod_rpcs_waitq,
2315 obd_mod_rpc_slot_avail(cli,
2319 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2321 /* Put a modify RPC slot from the obd client @cli according
2322 * to the kind of operation @opc that has been sent.
2324 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2326 bool close_req = false;
2331 if (opc == MDS_CLOSE)
2334 spin_lock(&cli->cl_mod_rpcs_lock);
2335 cli->cl_mod_rpcs_in_flight--;
2337 cli->cl_close_rpcs_in_flight--;
2338 /* release the tag in the bitmap */
2339 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2340 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2341 spin_unlock(&cli->cl_mod_rpcs_lock);
2342 wake_up(&cli->cl_mod_rpcs_waitq);
2344 EXPORT_SYMBOL(obd_put_mod_rpc_slot);