4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
54 static struct kmem_cache *obd_device_cachep;
55 struct kmem_cache *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 static struct kmem_cache *import_cachep;
59 static struct workqueue_struct *zombie_wq;
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64 const char *status, int locks, int debug_level);
66 static LIST_HEAD(obd_stale_exports);
67 static DEFINE_SPINLOCK(obd_stale_export_lock);
68 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
71 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
74 * support functions: we could use inter-module communication, but this
75 * is more portable to other OS's
77 static struct obd_device *obd_device_alloc(void)
79 struct obd_device *obd;
81 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
83 obd->obd_magic = OBD_DEVICE_MAGIC;
88 static void obd_device_free(struct obd_device *obd)
91 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
92 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
93 if (obd->obd_namespace != NULL) {
94 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
95 obd, obd->obd_namespace, obd->obd_force);
98 lu_ref_fini(&obd->obd_reference);
99 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
102 struct obd_type *class_search_type(const char *name)
104 struct list_head *tmp;
105 struct obd_type *type;
107 spin_lock(&obd_types_lock);
108 list_for_each(tmp, &obd_types) {
109 type = list_entry(tmp, struct obd_type, typ_chain);
110 if (strcmp(type->typ_name, name) == 0) {
111 spin_unlock(&obd_types_lock);
115 spin_unlock(&obd_types_lock);
118 EXPORT_SYMBOL(class_search_type);
120 struct obd_type *class_get_type(const char *name)
122 struct obd_type *type = class_search_type(name);
124 #ifdef HAVE_MODULE_LOADING_SUPPORT
126 const char *modname = name;
128 if (strcmp(modname, "obdfilter") == 0)
131 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
132 modname = LUSTRE_OSP_NAME;
134 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
135 modname = LUSTRE_MDT_NAME;
137 if (!request_module("%s", modname)) {
138 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
139 type = class_search_type(name);
141 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
147 spin_lock(&type->obd_type_lock);
149 try_module_get(type->typ_dt_ops->o_owner);
150 spin_unlock(&type->obd_type_lock);
155 void class_put_type(struct obd_type *type)
158 spin_lock(&type->obd_type_lock);
160 module_put(type->typ_dt_ops->o_owner);
161 spin_unlock(&type->obd_type_lock);
164 static void class_sysfs_release(struct kobject *kobj)
166 OBD_FREE(kobj, sizeof(*kobj));
169 static struct kobj_type class_ktype = {
170 .sysfs_ops = &lustre_sysfs_ops,
171 .release = class_sysfs_release,
174 struct kobject *class_setup_tunables(const char *name)
176 struct kobject *kobj;
179 #ifdef HAVE_SERVER_SUPPORT
180 kobj = kset_find_obj(lustre_kset, name);
184 OBD_ALLOC(kobj, sizeof(*kobj));
186 return ERR_PTR(-ENOMEM);
188 kobj->kset = lustre_kset;
189 kobject_init(kobj, &class_ktype);
190 rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
197 EXPORT_SYMBOL(class_setup_tunables);
199 #define CLASS_MAX_NAME 1024
201 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
202 bool enable_proc, struct lprocfs_vars *vars,
203 const char *name, struct lu_device_type *ldt)
205 struct obd_type *type;
206 #ifdef HAVE_SERVER_SUPPORT
208 #endif /* HAVE_SERVER_SUPPORT */
213 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
215 if (class_search_type(name)) {
216 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
221 OBD_ALLOC(type, sizeof(*type));
225 OBD_ALLOC_PTR(type->typ_dt_ops);
226 OBD_ALLOC_PTR(type->typ_md_ops);
227 OBD_ALLOC(type->typ_name, strlen(name) + 1);
229 if (type->typ_dt_ops == NULL ||
230 type->typ_md_ops == NULL ||
231 type->typ_name == NULL)
234 *(type->typ_dt_ops) = *dt_ops;
235 /* md_ops is optional */
237 *(type->typ_md_ops) = *md_ops;
238 strcpy(type->typ_name, name);
239 spin_lock_init(&type->obd_type_lock);
241 #ifdef CONFIG_PROC_FS
243 type->typ_procroot = lprocfs_register(type->typ_name,
246 if (IS_ERR(type->typ_procroot)) {
247 rc = PTR_ERR(type->typ_procroot);
248 type->typ_procroot = NULL;
253 #ifdef HAVE_SERVER_SUPPORT
255 dname.len = strlen(dname.name);
256 dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
258 type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
259 if (type->typ_debugfs_entry) {
260 dput(type->typ_debugfs_entry);
261 type->typ_sym_filter = true;
264 #endif /* HAVE_SERVER_SUPPORT */
266 type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
269 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
270 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
272 type->typ_debugfs_entry = NULL;
275 #ifdef HAVE_SERVER_SUPPORT
278 type->typ_kobj = class_setup_tunables(type->typ_name);
279 if (IS_ERR(type->typ_kobj))
280 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
284 rc = lu_device_type_init(ldt);
286 kobject_put(type->typ_kobj);
291 spin_lock(&obd_types_lock);
292 list_add(&type->typ_chain, &obd_types);
293 spin_unlock(&obd_types_lock);
298 #ifdef HAVE_SERVER_SUPPORT
299 if (type->typ_sym_filter)
300 type->typ_debugfs_entry = NULL;
302 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
303 ldebugfs_remove(&type->typ_debugfs_entry);
304 if (type->typ_name != NULL) {
305 #ifdef CONFIG_PROC_FS
306 if (type->typ_procroot != NULL)
307 remove_proc_subtree(type->typ_name, proc_lustre_root);
309 OBD_FREE(type->typ_name, strlen(name) + 1);
311 if (type->typ_md_ops != NULL)
312 OBD_FREE_PTR(type->typ_md_ops);
313 if (type->typ_dt_ops != NULL)
314 OBD_FREE_PTR(type->typ_dt_ops);
315 OBD_FREE(type, sizeof(*type));
318 EXPORT_SYMBOL(class_register_type);
320 int class_unregister_type(const char *name)
322 struct obd_type *type = class_search_type(name);
326 CERROR("unknown obd type\n");
330 if (type->typ_refcnt) {
331 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
332 /* This is a bad situation, let's make the best of it */
333 /* Remove ops, but leave the name for debugging */
334 OBD_FREE_PTR(type->typ_dt_ops);
335 OBD_FREE_PTR(type->typ_md_ops);
339 kobject_put(type->typ_kobj);
341 /* we do not use type->typ_procroot as for compatibility purposes
342 * other modules can share names (i.e. lod can use lov entry). so
343 * we can't reference pointer as it can get invalided when another
344 * module removes the entry */
345 #ifdef CONFIG_PROC_FS
346 if (type->typ_procroot != NULL)
347 remove_proc_subtree(type->typ_name, proc_lustre_root);
348 if (type->typ_procsym != NULL)
349 lprocfs_remove(&type->typ_procsym);
351 #ifdef HAVE_SERVER_SUPPORT
352 if (type->typ_sym_filter)
353 type->typ_debugfs_entry = NULL;
355 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
356 ldebugfs_remove(&type->typ_debugfs_entry);
359 lu_device_type_fini(type->typ_lu);
361 spin_lock(&obd_types_lock);
362 list_del(&type->typ_chain);
363 spin_unlock(&obd_types_lock);
364 OBD_FREE(type->typ_name, strlen(name) + 1);
365 if (type->typ_dt_ops != NULL)
366 OBD_FREE_PTR(type->typ_dt_ops);
367 if (type->typ_md_ops != NULL)
368 OBD_FREE_PTR(type->typ_md_ops);
369 OBD_FREE(type, sizeof(*type));
371 } /* class_unregister_type */
372 EXPORT_SYMBOL(class_unregister_type);
375 * Create a new obd device.
377 * Allocate the new obd_device and initialize it.
379 * \param[in] type_name obd device type string.
380 * \param[in] name obd device name.
381 * \param[in] uuid obd device UUID
383 * \retval newdev pointer to created obd_device
384 * \retval ERR_PTR(errno) on error
386 struct obd_device *class_newdev(const char *type_name, const char *name,
389 struct obd_device *newdev;
390 struct obd_type *type = NULL;
393 if (strlen(name) >= MAX_OBD_NAME) {
394 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
395 RETURN(ERR_PTR(-EINVAL));
398 type = class_get_type(type_name);
400 CERROR("OBD: unknown type: %s\n", type_name);
401 RETURN(ERR_PTR(-ENODEV));
404 newdev = obd_device_alloc();
405 if (newdev == NULL) {
406 class_put_type(type);
407 RETURN(ERR_PTR(-ENOMEM));
409 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
410 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
411 newdev->obd_type = type;
412 newdev->obd_minor = -1;
414 rwlock_init(&newdev->obd_pool_lock);
415 newdev->obd_pool_limit = 0;
416 newdev->obd_pool_slv = 0;
418 INIT_LIST_HEAD(&newdev->obd_exports);
419 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
420 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
421 INIT_LIST_HEAD(&newdev->obd_exports_timed);
422 INIT_LIST_HEAD(&newdev->obd_nid_stats);
423 spin_lock_init(&newdev->obd_nid_lock);
424 spin_lock_init(&newdev->obd_dev_lock);
425 mutex_init(&newdev->obd_dev_mutex);
426 spin_lock_init(&newdev->obd_osfs_lock);
427 /* newdev->obd_osfs_age must be set to a value in the distant
428 * past to guarantee a fresh statfs is fetched on mount. */
429 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
431 /* XXX belongs in setup not attach */
432 init_rwsem(&newdev->obd_observer_link_sem);
434 init_timer(&newdev->obd_recovery_timer);
435 spin_lock_init(&newdev->obd_recovery_task_lock);
436 init_waitqueue_head(&newdev->obd_next_transno_waitq);
437 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
438 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
439 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
440 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
441 INIT_LIST_HEAD(&newdev->obd_evict_list);
442 INIT_LIST_HEAD(&newdev->obd_lwp_list);
444 llog_group_init(&newdev->obd_olg);
445 /* Detach drops this */
446 atomic_set(&newdev->obd_refcount, 1);
447 lu_ref_init(&newdev->obd_reference);
448 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
450 newdev->obd_conn_inprogress = 0;
452 strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
454 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
455 newdev->obd_name, newdev);
463 * \param[in] obd obd_device to be freed
467 void class_free_dev(struct obd_device *obd)
469 struct obd_type *obd_type = obd->obd_type;
471 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
472 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
473 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
474 "obd %p != obd_devs[%d] %p\n",
475 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
476 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
477 "obd_refcount should be 0, not %d\n",
478 atomic_read(&obd->obd_refcount));
479 LASSERT(obd_type != NULL);
481 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
482 obd->obd_name, obd->obd_type->typ_name);
484 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
485 obd->obd_name, obd->obd_uuid.uuid);
486 if (obd->obd_stopping) {
489 /* If we're not stopping, we were never set up */
490 err = obd_cleanup(obd);
492 CERROR("Cleanup %s returned %d\n",
496 obd_device_free(obd);
498 class_put_type(obd_type);
502 * Unregister obd device.
504 * Free slot in obd_dev[] used by \a obd.
506 * \param[in] new_obd obd_device to be unregistered
510 void class_unregister_device(struct obd_device *obd)
512 write_lock(&obd_dev_lock);
513 if (obd->obd_minor >= 0) {
514 LASSERT(obd_devs[obd->obd_minor] == obd);
515 obd_devs[obd->obd_minor] = NULL;
518 write_unlock(&obd_dev_lock);
522 * Register obd device.
524 * Find free slot in obd_devs[], fills it with \a new_obd.
526 * \param[in] new_obd obd_device to be registered
529 * \retval -EEXIST device with this name is registered
530 * \retval -EOVERFLOW obd_devs[] is full
532 int class_register_device(struct obd_device *new_obd)
536 int new_obd_minor = 0;
537 bool minor_assign = false;
538 bool retried = false;
541 write_lock(&obd_dev_lock);
542 for (i = 0; i < class_devno_max(); i++) {
543 struct obd_device *obd = class_num2obd(i);
546 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
549 write_unlock(&obd_dev_lock);
551 /* the obd_device could be waited to be
552 * destroyed by the "obd_zombie_impexp_thread".
554 obd_zombie_barrier();
559 CERROR("%s: already exists, won't add\n",
561 /* in case we found a free slot before duplicate */
562 minor_assign = false;
566 if (!minor_assign && obd == NULL) {
573 new_obd->obd_minor = new_obd_minor;
574 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
575 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
576 obd_devs[new_obd_minor] = new_obd;
580 CERROR("%s: all %u/%u devices used, increase "
581 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
582 i, class_devno_max(), ret);
585 write_unlock(&obd_dev_lock);
590 static int class_name2dev_nolock(const char *name)
597 for (i = 0; i < class_devno_max(); i++) {
598 struct obd_device *obd = class_num2obd(i);
600 if (obd && strcmp(name, obd->obd_name) == 0) {
601 /* Make sure we finished attaching before we give
602 out any references */
603 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
604 if (obd->obd_attached) {
614 int class_name2dev(const char *name)
621 read_lock(&obd_dev_lock);
622 i = class_name2dev_nolock(name);
623 read_unlock(&obd_dev_lock);
627 EXPORT_SYMBOL(class_name2dev);
629 struct obd_device *class_name2obd(const char *name)
631 int dev = class_name2dev(name);
633 if (dev < 0 || dev > class_devno_max())
635 return class_num2obd(dev);
637 EXPORT_SYMBOL(class_name2obd);
639 int class_uuid2dev_nolock(struct obd_uuid *uuid)
643 for (i = 0; i < class_devno_max(); i++) {
644 struct obd_device *obd = class_num2obd(i);
646 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
647 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
655 int class_uuid2dev(struct obd_uuid *uuid)
659 read_lock(&obd_dev_lock);
660 i = class_uuid2dev_nolock(uuid);
661 read_unlock(&obd_dev_lock);
665 EXPORT_SYMBOL(class_uuid2dev);
667 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
669 int dev = class_uuid2dev(uuid);
672 return class_num2obd(dev);
674 EXPORT_SYMBOL(class_uuid2obd);
677 * Get obd device from ::obd_devs[]
679 * \param num [in] array index
681 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
682 * otherwise return the obd device there.
684 struct obd_device *class_num2obd(int num)
686 struct obd_device *obd = NULL;
688 if (num < class_devno_max()) {
693 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
694 "%p obd_magic %08x != %08x\n",
695 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
696 LASSERTF(obd->obd_minor == num,
697 "%p obd_minor %0d != %0d\n",
698 obd, obd->obd_minor, num);
705 * Find obd in obd_dev[] by name or uuid.
707 * Increment obd's refcount if found.
709 * \param[in] str obd name or uuid
711 * \retval NULL if not found
712 * \retval target pointer to found obd_device
714 struct obd_device *class_dev_by_str(const char *str)
716 struct obd_device *target = NULL;
717 struct obd_uuid tgtuuid;
720 obd_str2uuid(&tgtuuid, str);
722 read_lock(&obd_dev_lock);
723 rc = class_uuid2dev_nolock(&tgtuuid);
725 rc = class_name2dev_nolock(str);
728 target = class_num2obd(rc);
731 class_incref(target, "find", current);
732 read_unlock(&obd_dev_lock);
736 EXPORT_SYMBOL(class_dev_by_str);
739 * Get obd devices count. Device in any
741 * \retval obd device count
743 int get_devices_count(void)
745 int index, max_index = class_devno_max(), dev_count = 0;
747 read_lock(&obd_dev_lock);
748 for (index = 0; index <= max_index; index++) {
749 struct obd_device *obd = class_num2obd(index);
753 read_unlock(&obd_dev_lock);
757 EXPORT_SYMBOL(get_devices_count);
759 void class_obd_list(void)
764 read_lock(&obd_dev_lock);
765 for (i = 0; i < class_devno_max(); i++) {
766 struct obd_device *obd = class_num2obd(i);
770 if (obd->obd_stopping)
772 else if (obd->obd_set_up)
774 else if (obd->obd_attached)
778 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
779 i, status, obd->obd_type->typ_name,
780 obd->obd_name, obd->obd_uuid.uuid,
781 atomic_read(&obd->obd_refcount));
783 read_unlock(&obd_dev_lock);
787 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
788 specified, then only the client with that uuid is returned,
789 otherwise any client connected to the tgt is returned. */
790 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
791 const char * typ_name,
792 struct obd_uuid *grp_uuid)
796 read_lock(&obd_dev_lock);
797 for (i = 0; i < class_devno_max(); i++) {
798 struct obd_device *obd = class_num2obd(i);
802 if ((strncmp(obd->obd_type->typ_name, typ_name,
803 strlen(typ_name)) == 0)) {
804 if (obd_uuid_equals(tgt_uuid,
805 &obd->u.cli.cl_target_uuid) &&
806 ((grp_uuid)? obd_uuid_equals(grp_uuid,
807 &obd->obd_uuid) : 1)) {
808 read_unlock(&obd_dev_lock);
813 read_unlock(&obd_dev_lock);
817 EXPORT_SYMBOL(class_find_client_obd);
819 /* Iterate the obd_device list looking devices have grp_uuid. Start
820 searching at *next, and if a device is found, the next index to look
821 at is saved in *next. If next is NULL, then the first matching device
822 will always be returned. */
823 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
829 else if (*next >= 0 && *next < class_devno_max())
834 read_lock(&obd_dev_lock);
835 for (; i < class_devno_max(); i++) {
836 struct obd_device *obd = class_num2obd(i);
840 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
843 read_unlock(&obd_dev_lock);
847 read_unlock(&obd_dev_lock);
851 EXPORT_SYMBOL(class_devices_in_group);
854 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
855 * adjust sptlrpc settings accordingly.
857 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
859 struct obd_device *obd;
863 LASSERT(namelen > 0);
865 read_lock(&obd_dev_lock);
866 for (i = 0; i < class_devno_max(); i++) {
867 obd = class_num2obd(i);
869 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
872 /* only notify mdc, osc, osp, lwp, mdt, ost
873 * because only these have a -sptlrpc llog */
874 type = obd->obd_type->typ_name;
875 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
876 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
877 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
878 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
879 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
880 strcmp(type, LUSTRE_OST_NAME) != 0)
883 if (strncmp(obd->obd_name, fsname, namelen))
886 class_incref(obd, __FUNCTION__, obd);
887 read_unlock(&obd_dev_lock);
888 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
889 sizeof(KEY_SPTLRPC_CONF),
890 KEY_SPTLRPC_CONF, 0, NULL, NULL);
892 class_decref(obd, __FUNCTION__, obd);
893 read_lock(&obd_dev_lock);
895 read_unlock(&obd_dev_lock);
898 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
900 void obd_cleanup_caches(void)
903 if (obd_device_cachep) {
904 kmem_cache_destroy(obd_device_cachep);
905 obd_device_cachep = NULL;
908 kmem_cache_destroy(obdo_cachep);
912 kmem_cache_destroy(import_cachep);
913 import_cachep = NULL;
919 int obd_init_caches(void)
924 LASSERT(obd_device_cachep == NULL);
925 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
926 sizeof(struct obd_device),
928 if (!obd_device_cachep)
929 GOTO(out, rc = -ENOMEM);
931 LASSERT(obdo_cachep == NULL);
932 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
935 GOTO(out, rc = -ENOMEM);
937 LASSERT(import_cachep == NULL);
938 import_cachep = kmem_cache_create("ll_import_cache",
939 sizeof(struct obd_import),
942 GOTO(out, rc = -ENOMEM);
946 obd_cleanup_caches();
950 /* map connection to client */
951 struct obd_export *class_conn2export(struct lustre_handle *conn)
953 struct obd_export *export;
957 CDEBUG(D_CACHE, "looking for null handle\n");
961 if (conn->cookie == -1) { /* this means assign a new connection */
962 CDEBUG(D_CACHE, "want a new connection\n");
966 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
967 export = class_handle2object(conn->cookie, NULL);
970 EXPORT_SYMBOL(class_conn2export);
972 struct obd_device *class_exp2obd(struct obd_export *exp)
978 EXPORT_SYMBOL(class_exp2obd);
980 struct obd_device *class_conn2obd(struct lustre_handle *conn)
982 struct obd_export *export;
983 export = class_conn2export(conn);
985 struct obd_device *obd = export->exp_obd;
986 class_export_put(export);
992 struct obd_import *class_exp2cliimp(struct obd_export *exp)
994 struct obd_device *obd = exp->exp_obd;
997 return obd->u.cli.cl_import;
999 EXPORT_SYMBOL(class_exp2cliimp);
1001 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
1003 struct obd_device *obd = class_conn2obd(conn);
1006 return obd->u.cli.cl_import;
1009 /* Export management functions */
1010 static void class_export_destroy(struct obd_export *exp)
1012 struct obd_device *obd = exp->exp_obd;
1015 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
1016 LASSERT(obd != NULL);
1018 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
1019 exp->exp_client_uuid.uuid, obd->obd_name);
1021 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
1022 if (exp->exp_connection)
1023 ptlrpc_put_connection_superhack(exp->exp_connection);
1025 LASSERT(list_empty(&exp->exp_outstanding_replies));
1026 LASSERT(list_empty(&exp->exp_uncommitted_replies));
1027 LASSERT(list_empty(&exp->exp_req_replay_queue));
1028 LASSERT(list_empty(&exp->exp_hp_rpcs));
1029 obd_destroy_export(exp);
1030 /* self export doesn't hold a reference to an obd, although it
1031 * exists until freeing of the obd */
1032 if (exp != obd->obd_self_export)
1033 class_decref(obd, "export", exp);
1035 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1039 static void export_handle_addref(void *export)
1041 class_export_get(export);
1044 static struct portals_handle_ops export_handle_ops = {
1045 .hop_addref = export_handle_addref,
1049 struct obd_export *class_export_get(struct obd_export *exp)
1051 atomic_inc(&exp->exp_refcount);
1052 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1053 atomic_read(&exp->exp_refcount));
1056 EXPORT_SYMBOL(class_export_get);
1058 void class_export_put(struct obd_export *exp)
1060 LASSERT(exp != NULL);
1061 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1062 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1063 atomic_read(&exp->exp_refcount) - 1);
1065 if (atomic_dec_and_test(&exp->exp_refcount)) {
1066 struct obd_device *obd = exp->exp_obd;
1068 CDEBUG(D_IOCTL, "final put %p/%s\n",
1069 exp, exp->exp_client_uuid.uuid);
1071 /* release nid stat refererence */
1072 lprocfs_exp_cleanup(exp);
1074 if (exp == obd->obd_self_export) {
1075 /* self export should be destroyed without
1076 * zombie thread as it doesn't hold a
1077 * reference to obd and doesn't hold any
1079 class_export_destroy(exp);
1080 /* self export is destroyed, no class
1081 * references exist and it is safe to free
1083 class_free_dev(obd);
1085 LASSERT(!list_empty(&exp->exp_obd_chain));
1086 obd_zombie_export_add(exp);
1091 EXPORT_SYMBOL(class_export_put);
1093 static void obd_zombie_exp_cull(struct work_struct *ws)
1095 struct obd_export *export;
1097 export = container_of(ws, struct obd_export, exp_zombie_work);
1098 class_export_destroy(export);
1101 /* Creates a new export, adds it to the hash table, and returns a
1102 * pointer to it. The refcount is 2: one for the hash reference, and
1103 * one for the pointer returned by this function. */
1104 struct obd_export *__class_new_export(struct obd_device *obd,
1105 struct obd_uuid *cluuid, bool is_self)
1107 struct obd_export *export;
1108 struct cfs_hash *hash = NULL;
1112 OBD_ALLOC_PTR(export);
1114 return ERR_PTR(-ENOMEM);
1116 export->exp_conn_cnt = 0;
1117 export->exp_lock_hash = NULL;
1118 export->exp_flock_hash = NULL;
1119 /* 2 = class_handle_hash + last */
1120 atomic_set(&export->exp_refcount, 2);
1121 atomic_set(&export->exp_rpc_count, 0);
1122 atomic_set(&export->exp_cb_count, 0);
1123 atomic_set(&export->exp_locks_count, 0);
1124 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1125 INIT_LIST_HEAD(&export->exp_locks_list);
1126 spin_lock_init(&export->exp_locks_list_guard);
1128 atomic_set(&export->exp_replay_count, 0);
1129 export->exp_obd = obd;
1130 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1131 spin_lock_init(&export->exp_uncommitted_replies_lock);
1132 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1133 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1134 INIT_LIST_HEAD(&export->exp_handle.h_link);
1135 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1136 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1137 class_handle_hash(&export->exp_handle, &export_handle_ops);
1138 export->exp_last_request_time = ktime_get_real_seconds();
1139 spin_lock_init(&export->exp_lock);
1140 spin_lock_init(&export->exp_rpc_lock);
1141 INIT_HLIST_NODE(&export->exp_uuid_hash);
1142 INIT_HLIST_NODE(&export->exp_nid_hash);
1143 INIT_HLIST_NODE(&export->exp_gen_hash);
1144 spin_lock_init(&export->exp_bl_list_lock);
1145 INIT_LIST_HEAD(&export->exp_bl_list);
1146 INIT_LIST_HEAD(&export->exp_stale_list);
1147 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1149 export->exp_sp_peer = LUSTRE_SP_ANY;
1150 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1151 export->exp_client_uuid = *cluuid;
1152 obd_init_export(export);
1154 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1155 spin_lock(&obd->obd_dev_lock);
1156 /* shouldn't happen, but might race */
1157 if (obd->obd_stopping)
1158 GOTO(exit_unlock, rc = -ENODEV);
1160 hash = cfs_hash_getref(obd->obd_uuid_hash);
1162 GOTO(exit_unlock, rc = -ENODEV);
1163 spin_unlock(&obd->obd_dev_lock);
1165 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1167 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1168 obd->obd_name, cluuid->uuid, rc);
1169 GOTO(exit_err, rc = -EALREADY);
1173 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1174 spin_lock(&obd->obd_dev_lock);
1175 if (obd->obd_stopping) {
1177 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1178 GOTO(exit_unlock, rc = -ESHUTDOWN);
1182 class_incref(obd, "export", export);
1183 list_add_tail(&export->exp_obd_chain_timed,
1184 &obd->obd_exports_timed);
1185 list_add(&export->exp_obd_chain, &obd->obd_exports);
1186 obd->obd_num_exports++;
1188 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1189 INIT_LIST_HEAD(&export->exp_obd_chain);
1191 spin_unlock(&obd->obd_dev_lock);
1193 cfs_hash_putref(hash);
1197 spin_unlock(&obd->obd_dev_lock);
1200 cfs_hash_putref(hash);
1201 class_handle_unhash(&export->exp_handle);
1202 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1203 obd_destroy_export(export);
1204 OBD_FREE_PTR(export);
1208 struct obd_export *class_new_export(struct obd_device *obd,
1209 struct obd_uuid *uuid)
1211 return __class_new_export(obd, uuid, false);
1213 EXPORT_SYMBOL(class_new_export);
1215 struct obd_export *class_new_export_self(struct obd_device *obd,
1216 struct obd_uuid *uuid)
1218 return __class_new_export(obd, uuid, true);
1221 void class_unlink_export(struct obd_export *exp)
1223 class_handle_unhash(&exp->exp_handle);
1225 if (exp->exp_obd->obd_self_export == exp) {
1226 class_export_put(exp);
1230 spin_lock(&exp->exp_obd->obd_dev_lock);
1231 /* delete an uuid-export hashitem from hashtables */
1232 if (!hlist_unhashed(&exp->exp_uuid_hash))
1233 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1234 &exp->exp_client_uuid,
1235 &exp->exp_uuid_hash);
1237 #ifdef HAVE_SERVER_SUPPORT
1238 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1239 struct tg_export_data *ted = &exp->exp_target_data;
1240 struct cfs_hash *hash;
1242 /* Because obd_gen_hash will not be released until
1243 * class_cleanup(), so hash should never be NULL here */
1244 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1245 LASSERT(hash != NULL);
1246 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1247 &exp->exp_gen_hash);
1248 cfs_hash_putref(hash);
1250 #endif /* HAVE_SERVER_SUPPORT */
1252 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1253 list_del_init(&exp->exp_obd_chain_timed);
1254 exp->exp_obd->obd_num_exports--;
1255 spin_unlock(&exp->exp_obd->obd_dev_lock);
1256 atomic_inc(&obd_stale_export_num);
1258 /* A reference is kept by obd_stale_exports list */
1259 obd_stale_export_put(exp);
1261 EXPORT_SYMBOL(class_unlink_export);
1263 /* Import management functions */
1264 static void class_import_destroy(struct obd_import *imp)
1268 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1269 imp->imp_obd->obd_name);
1271 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1273 ptlrpc_put_connection_superhack(imp->imp_connection);
1275 while (!list_empty(&imp->imp_conn_list)) {
1276 struct obd_import_conn *imp_conn;
1278 imp_conn = list_entry(imp->imp_conn_list.next,
1279 struct obd_import_conn, oic_item);
1280 list_del_init(&imp_conn->oic_item);
1281 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1282 OBD_FREE(imp_conn, sizeof(*imp_conn));
1285 LASSERT(imp->imp_sec == NULL);
1286 class_decref(imp->imp_obd, "import", imp);
1287 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1291 static void import_handle_addref(void *import)
1293 class_import_get(import);
1296 static struct portals_handle_ops import_handle_ops = {
1297 .hop_addref = import_handle_addref,
1301 struct obd_import *class_import_get(struct obd_import *import)
1303 atomic_inc(&import->imp_refcount);
1304 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1305 atomic_read(&import->imp_refcount),
1306 import->imp_obd->obd_name);
1309 EXPORT_SYMBOL(class_import_get);
1311 void class_import_put(struct obd_import *imp)
1315 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1317 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1318 atomic_read(&imp->imp_refcount) - 1,
1319 imp->imp_obd->obd_name);
1321 if (atomic_dec_and_test(&imp->imp_refcount)) {
1322 CDEBUG(D_INFO, "final put import %p\n", imp);
1323 obd_zombie_import_add(imp);
1326 /* catch possible import put race */
1327 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1330 EXPORT_SYMBOL(class_import_put);
1332 static void init_imp_at(struct imp_at *at) {
1334 at_init(&at->iat_net_latency, 0, 0);
1335 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1336 /* max service estimates are tracked on the server side, so
1337 don't use the AT history here, just use the last reported
1338 val. (But keep hist for proc histogram, worst_ever) */
1339 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1344 static void obd_zombie_imp_cull(struct work_struct *ws)
1346 struct obd_import *import;
1348 import = container_of(ws, struct obd_import, imp_zombie_work);
1349 class_import_destroy(import);
1352 struct obd_import *class_new_import(struct obd_device *obd)
1354 struct obd_import *imp;
1355 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1357 OBD_ALLOC(imp, sizeof(*imp));
1361 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1362 INIT_LIST_HEAD(&imp->imp_replay_list);
1363 INIT_LIST_HEAD(&imp->imp_sending_list);
1364 INIT_LIST_HEAD(&imp->imp_delayed_list);
1365 INIT_LIST_HEAD(&imp->imp_committed_list);
1366 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1367 imp->imp_known_replied_xid = 0;
1368 imp->imp_replay_cursor = &imp->imp_committed_list;
1369 spin_lock_init(&imp->imp_lock);
1370 imp->imp_last_success_conn = 0;
1371 imp->imp_state = LUSTRE_IMP_NEW;
1372 imp->imp_obd = class_incref(obd, "import", imp);
1373 mutex_init(&imp->imp_sec_mutex);
1374 init_waitqueue_head(&imp->imp_recovery_waitq);
1375 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1377 if (curr_pid_ns->child_reaper)
1378 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1380 imp->imp_sec_refpid = 1;
1382 atomic_set(&imp->imp_refcount, 2);
1383 atomic_set(&imp->imp_unregistering, 0);
1384 atomic_set(&imp->imp_inflight, 0);
1385 atomic_set(&imp->imp_replay_inflight, 0);
1386 atomic_set(&imp->imp_inval_count, 0);
1387 INIT_LIST_HEAD(&imp->imp_conn_list);
1388 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1389 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1390 init_imp_at(&imp->imp_at);
1392 /* the default magic is V2, will be used in connect RPC, and
1393 * then adjusted according to the flags in request/reply. */
1394 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1398 EXPORT_SYMBOL(class_new_import);
1400 void class_destroy_import(struct obd_import *import)
1402 LASSERT(import != NULL);
1403 LASSERT(import != LP_POISON);
1405 class_handle_unhash(&import->imp_handle);
1407 spin_lock(&import->imp_lock);
1408 import->imp_generation++;
1409 spin_unlock(&import->imp_lock);
1410 class_import_put(import);
1412 EXPORT_SYMBOL(class_destroy_import);
1414 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1416 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1418 spin_lock(&exp->exp_locks_list_guard);
1420 LASSERT(lock->l_exp_refs_nr >= 0);
1422 if (lock->l_exp_refs_target != NULL &&
1423 lock->l_exp_refs_target != exp) {
1424 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1425 exp, lock, lock->l_exp_refs_target);
1427 if ((lock->l_exp_refs_nr ++) == 0) {
1428 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1429 lock->l_exp_refs_target = exp;
1431 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1432 lock, exp, lock->l_exp_refs_nr);
1433 spin_unlock(&exp->exp_locks_list_guard);
1435 EXPORT_SYMBOL(__class_export_add_lock_ref);
1437 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1439 spin_lock(&exp->exp_locks_list_guard);
1440 LASSERT(lock->l_exp_refs_nr > 0);
1441 if (lock->l_exp_refs_target != exp) {
1442 LCONSOLE_WARN("lock %p, "
1443 "mismatching export pointers: %p, %p\n",
1444 lock, lock->l_exp_refs_target, exp);
1446 if (-- lock->l_exp_refs_nr == 0) {
1447 list_del_init(&lock->l_exp_refs_link);
1448 lock->l_exp_refs_target = NULL;
1450 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1451 lock, exp, lock->l_exp_refs_nr);
1452 spin_unlock(&exp->exp_locks_list_guard);
1454 EXPORT_SYMBOL(__class_export_del_lock_ref);
1457 /* A connection defines an export context in which preallocation can
1458 be managed. This releases the export pointer reference, and returns
1459 the export handle, so the export refcount is 1 when this function
1461 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1462 struct obd_uuid *cluuid)
1464 struct obd_export *export;
1465 LASSERT(conn != NULL);
1466 LASSERT(obd != NULL);
1467 LASSERT(cluuid != NULL);
1470 export = class_new_export(obd, cluuid);
1472 RETURN(PTR_ERR(export));
1474 conn->cookie = export->exp_handle.h_cookie;
1475 class_export_put(export);
1477 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1478 cluuid->uuid, conn->cookie);
1481 EXPORT_SYMBOL(class_connect);
1483 /* if export is involved in recovery then clean up related things */
1484 static void class_export_recovery_cleanup(struct obd_export *exp)
1486 struct obd_device *obd = exp->exp_obd;
1488 spin_lock(&obd->obd_recovery_task_lock);
1489 if (obd->obd_recovering) {
1490 if (exp->exp_in_recovery) {
1491 spin_lock(&exp->exp_lock);
1492 exp->exp_in_recovery = 0;
1493 spin_unlock(&exp->exp_lock);
1494 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1495 atomic_dec(&obd->obd_connected_clients);
1498 /* if called during recovery then should update
1499 * obd_stale_clients counter,
1500 * lightweight exports are not counted */
1501 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1502 exp->exp_obd->obd_stale_clients++;
1504 spin_unlock(&obd->obd_recovery_task_lock);
1506 spin_lock(&exp->exp_lock);
1507 /** Cleanup req replay fields */
1508 if (exp->exp_req_replay_needed) {
1509 exp->exp_req_replay_needed = 0;
1511 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1512 atomic_dec(&obd->obd_req_replay_clients);
1515 /** Cleanup lock replay data */
1516 if (exp->exp_lock_replay_needed) {
1517 exp->exp_lock_replay_needed = 0;
1519 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1520 atomic_dec(&obd->obd_lock_replay_clients);
1522 spin_unlock(&exp->exp_lock);
1525 /* This function removes 1-3 references from the export:
1526 * 1 - for export pointer passed
1527 * and if disconnect really need
1528 * 2 - removing from hash
1529 * 3 - in client_unlink_export
1530 * The export pointer passed to this function can destroyed */
1531 int class_disconnect(struct obd_export *export)
1533 int already_disconnected;
1536 if (export == NULL) {
1537 CWARN("attempting to free NULL export %p\n", export);
1541 spin_lock(&export->exp_lock);
1542 already_disconnected = export->exp_disconnected;
1543 export->exp_disconnected = 1;
1544 /* We hold references of export for uuid hash
1545 * and nid_hash and export link at least. So
1546 * it is safe to call cfs_hash_del in there. */
1547 if (!hlist_unhashed(&export->exp_nid_hash))
1548 cfs_hash_del(export->exp_obd->obd_nid_hash,
1549 &export->exp_connection->c_peer.nid,
1550 &export->exp_nid_hash);
1551 spin_unlock(&export->exp_lock);
1553 /* class_cleanup(), abort_recovery(), and class_fail_export()
1554 * all end up in here, and if any of them race we shouldn't
1555 * call extra class_export_puts(). */
1556 if (already_disconnected) {
1557 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1558 GOTO(no_disconn, already_disconnected);
1561 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1562 export->exp_handle.h_cookie);
1564 class_export_recovery_cleanup(export);
1565 class_unlink_export(export);
1567 class_export_put(export);
1570 EXPORT_SYMBOL(class_disconnect);
1572 /* Return non-zero for a fully connected export */
1573 int class_connected_export(struct obd_export *exp)
1578 spin_lock(&exp->exp_lock);
1579 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1580 spin_unlock(&exp->exp_lock);
1584 EXPORT_SYMBOL(class_connected_export);
1586 static void class_disconnect_export_list(struct list_head *list,
1587 enum obd_option flags)
1590 struct obd_export *exp;
1593 /* It's possible that an export may disconnect itself, but
1594 * nothing else will be added to this list. */
1595 while (!list_empty(list)) {
1596 exp = list_entry(list->next, struct obd_export,
1598 /* need for safe call CDEBUG after obd_disconnect */
1599 class_export_get(exp);
1601 spin_lock(&exp->exp_lock);
1602 exp->exp_flags = flags;
1603 spin_unlock(&exp->exp_lock);
1605 if (obd_uuid_equals(&exp->exp_client_uuid,
1606 &exp->exp_obd->obd_uuid)) {
1608 "exp %p export uuid == obd uuid, don't discon\n",
1610 /* Need to delete this now so we don't end up pointing
1611 * to work_list later when this export is cleaned up. */
1612 list_del_init(&exp->exp_obd_chain);
1613 class_export_put(exp);
1617 class_export_get(exp);
1618 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1619 "last request at %lld\n",
1620 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1621 exp, exp->exp_last_request_time);
1622 /* release one export reference anyway */
1623 rc = obd_disconnect(exp);
1625 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1626 obd_export_nid2str(exp), exp, rc);
1627 class_export_put(exp);
1632 void class_disconnect_exports(struct obd_device *obd)
1634 struct list_head work_list;
1637 /* Move all of the exports from obd_exports to a work list, en masse. */
1638 INIT_LIST_HEAD(&work_list);
1639 spin_lock(&obd->obd_dev_lock);
1640 list_splice_init(&obd->obd_exports, &work_list);
1641 list_splice_init(&obd->obd_delayed_exports, &work_list);
1642 spin_unlock(&obd->obd_dev_lock);
1644 if (!list_empty(&work_list)) {
1645 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1646 "disconnecting them\n", obd->obd_minor, obd);
1647 class_disconnect_export_list(&work_list,
1648 exp_flags_from_obd(obd));
1650 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1651 obd->obd_minor, obd);
1654 EXPORT_SYMBOL(class_disconnect_exports);
1656 /* Remove exports that have not completed recovery.
1658 void class_disconnect_stale_exports(struct obd_device *obd,
1659 int (*test_export)(struct obd_export *))
1661 struct list_head work_list;
1662 struct obd_export *exp, *n;
1666 INIT_LIST_HEAD(&work_list);
1667 spin_lock(&obd->obd_dev_lock);
1668 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1670 /* don't count self-export as client */
1671 if (obd_uuid_equals(&exp->exp_client_uuid,
1672 &exp->exp_obd->obd_uuid))
1675 /* don't evict clients which have no slot in last_rcvd
1676 * (e.g. lightweight connection) */
1677 if (exp->exp_target_data.ted_lr_idx == -1)
1680 spin_lock(&exp->exp_lock);
1681 if (exp->exp_failed || test_export(exp)) {
1682 spin_unlock(&exp->exp_lock);
1685 exp->exp_failed = 1;
1686 spin_unlock(&exp->exp_lock);
1688 list_move(&exp->exp_obd_chain, &work_list);
1690 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1691 obd->obd_name, exp->exp_client_uuid.uuid,
1692 obd_export_nid2str(exp));
1693 print_export_data(exp, "EVICTING", 0, D_HA);
1695 spin_unlock(&obd->obd_dev_lock);
1698 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1699 obd->obd_name, evicted);
1701 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1702 OBD_OPT_ABORT_RECOV);
1705 EXPORT_SYMBOL(class_disconnect_stale_exports);
1707 void class_fail_export(struct obd_export *exp)
1709 int rc, already_failed;
1711 spin_lock(&exp->exp_lock);
1712 already_failed = exp->exp_failed;
1713 exp->exp_failed = 1;
1714 spin_unlock(&exp->exp_lock);
1716 if (already_failed) {
1717 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1718 exp, exp->exp_client_uuid.uuid);
1722 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1723 exp, exp->exp_client_uuid.uuid);
1725 if (obd_dump_on_timeout)
1726 libcfs_debug_dumplog();
1728 /* need for safe call CDEBUG after obd_disconnect */
1729 class_export_get(exp);
1731 /* Most callers into obd_disconnect are removing their own reference
1732 * (request, for example) in addition to the one from the hash table.
1733 * We don't have such a reference here, so make one. */
1734 class_export_get(exp);
1735 rc = obd_disconnect(exp);
1737 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1739 CDEBUG(D_HA, "disconnected export %p/%s\n",
1740 exp, exp->exp_client_uuid.uuid);
1741 class_export_put(exp);
1743 EXPORT_SYMBOL(class_fail_export);
1745 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1747 struct cfs_hash *nid_hash;
1748 struct obd_export *doomed_exp = NULL;
1749 int exports_evicted = 0;
1751 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1753 spin_lock(&obd->obd_dev_lock);
1754 /* umount has run already, so evict thread should leave
1755 * its task to umount thread now */
1756 if (obd->obd_stopping) {
1757 spin_unlock(&obd->obd_dev_lock);
1758 return exports_evicted;
1760 nid_hash = obd->obd_nid_hash;
1761 cfs_hash_getref(nid_hash);
1762 spin_unlock(&obd->obd_dev_lock);
1765 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1766 if (doomed_exp == NULL)
1769 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1770 "nid %s found, wanted nid %s, requested nid %s\n",
1771 obd_export_nid2str(doomed_exp),
1772 libcfs_nid2str(nid_key), nid);
1773 LASSERTF(doomed_exp != obd->obd_self_export,
1774 "self-export is hashed by NID?\n");
1776 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1777 "request\n", obd->obd_name,
1778 obd_uuid2str(&doomed_exp->exp_client_uuid),
1779 obd_export_nid2str(doomed_exp));
1780 class_fail_export(doomed_exp);
1781 class_export_put(doomed_exp);
1784 cfs_hash_putref(nid_hash);
1786 if (!exports_evicted)
1787 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1788 obd->obd_name, nid);
1789 return exports_evicted;
1791 EXPORT_SYMBOL(obd_export_evict_by_nid);
1793 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1795 struct cfs_hash *uuid_hash;
1796 struct obd_export *doomed_exp = NULL;
1797 struct obd_uuid doomed_uuid;
1798 int exports_evicted = 0;
1800 spin_lock(&obd->obd_dev_lock);
1801 if (obd->obd_stopping) {
1802 spin_unlock(&obd->obd_dev_lock);
1803 return exports_evicted;
1805 uuid_hash = obd->obd_uuid_hash;
1806 cfs_hash_getref(uuid_hash);
1807 spin_unlock(&obd->obd_dev_lock);
1809 obd_str2uuid(&doomed_uuid, uuid);
1810 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1811 CERROR("%s: can't evict myself\n", obd->obd_name);
1812 cfs_hash_putref(uuid_hash);
1813 return exports_evicted;
1816 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1818 if (doomed_exp == NULL) {
1819 CERROR("%s: can't disconnect %s: no exports found\n",
1820 obd->obd_name, uuid);
1822 CWARN("%s: evicting %s at adminstrative request\n",
1823 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1824 class_fail_export(doomed_exp);
1825 class_export_put(doomed_exp);
1828 cfs_hash_putref(uuid_hash);
1830 return exports_evicted;
1833 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1834 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1835 EXPORT_SYMBOL(class_export_dump_hook);
1838 static void print_export_data(struct obd_export *exp, const char *status,
1839 int locks, int debug_level)
1841 struct ptlrpc_reply_state *rs;
1842 struct ptlrpc_reply_state *first_reply = NULL;
1845 spin_lock(&exp->exp_lock);
1846 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1852 spin_unlock(&exp->exp_lock);
1854 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1855 "%p %s %llu stale:%d\n",
1856 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1857 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1858 atomic_read(&exp->exp_rpc_count),
1859 atomic_read(&exp->exp_cb_count),
1860 atomic_read(&exp->exp_locks_count),
1861 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1862 nreplies, first_reply, nreplies > 3 ? "..." : "",
1863 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1864 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1865 if (locks && class_export_dump_hook != NULL)
1866 class_export_dump_hook(exp);
1870 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1872 struct obd_export *exp;
1874 spin_lock(&obd->obd_dev_lock);
1875 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1876 print_export_data(exp, "ACTIVE", locks, debug_level);
1877 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1878 print_export_data(exp, "UNLINKED", locks, debug_level);
1879 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1880 print_export_data(exp, "DELAYED", locks, debug_level);
1881 spin_unlock(&obd->obd_dev_lock);
1884 void obd_exports_barrier(struct obd_device *obd)
1887 LASSERT(list_empty(&obd->obd_exports));
1888 spin_lock(&obd->obd_dev_lock);
1889 while (!list_empty(&obd->obd_unlinked_exports)) {
1890 spin_unlock(&obd->obd_dev_lock);
1891 set_current_state(TASK_UNINTERRUPTIBLE);
1892 schedule_timeout(cfs_time_seconds(waited));
1893 if (waited > 5 && is_power_of_2(waited)) {
1894 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1895 "more than %d seconds. "
1896 "The obd refcount = %d. Is it stuck?\n",
1897 obd->obd_name, waited,
1898 atomic_read(&obd->obd_refcount));
1899 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1902 spin_lock(&obd->obd_dev_lock);
1904 spin_unlock(&obd->obd_dev_lock);
1906 EXPORT_SYMBOL(obd_exports_barrier);
1909 * Add export to the obd_zombe thread and notify it.
1911 static void obd_zombie_export_add(struct obd_export *exp) {
1912 atomic_dec(&obd_stale_export_num);
1913 spin_lock(&exp->exp_obd->obd_dev_lock);
1914 LASSERT(!list_empty(&exp->exp_obd_chain));
1915 list_del_init(&exp->exp_obd_chain);
1916 spin_unlock(&exp->exp_obd->obd_dev_lock);
1918 queue_work(zombie_wq, &exp->exp_zombie_work);
1922 * Add import to the obd_zombe thread and notify it.
1924 static void obd_zombie_import_add(struct obd_import *imp) {
1925 LASSERT(imp->imp_sec == NULL);
1927 queue_work(zombie_wq, &imp->imp_zombie_work);
1931 * wait when obd_zombie import/export queues become empty
1933 void obd_zombie_barrier(void)
1935 flush_workqueue(zombie_wq);
1937 EXPORT_SYMBOL(obd_zombie_barrier);
1940 struct obd_export *obd_stale_export_get(void)
1942 struct obd_export *exp = NULL;
1945 spin_lock(&obd_stale_export_lock);
1946 if (!list_empty(&obd_stale_exports)) {
1947 exp = list_entry(obd_stale_exports.next,
1948 struct obd_export, exp_stale_list);
1949 list_del_init(&exp->exp_stale_list);
1951 spin_unlock(&obd_stale_export_lock);
1954 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1955 atomic_read(&obd_stale_export_num));
1959 EXPORT_SYMBOL(obd_stale_export_get);
1961 void obd_stale_export_put(struct obd_export *exp)
1965 LASSERT(list_empty(&exp->exp_stale_list));
1966 if (exp->exp_lock_hash &&
1967 atomic_read(&exp->exp_lock_hash->hs_count)) {
1968 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1969 atomic_read(&obd_stale_export_num));
1971 spin_lock_bh(&exp->exp_bl_list_lock);
1972 spin_lock(&obd_stale_export_lock);
1973 /* Add to the tail if there is no blocked locks,
1974 * to the head otherwise. */
1975 if (list_empty(&exp->exp_bl_list))
1976 list_add_tail(&exp->exp_stale_list,
1977 &obd_stale_exports);
1979 list_add(&exp->exp_stale_list,
1980 &obd_stale_exports);
1982 spin_unlock(&obd_stale_export_lock);
1983 spin_unlock_bh(&exp->exp_bl_list_lock);
1985 class_export_put(exp);
1989 EXPORT_SYMBOL(obd_stale_export_put);
1992 * Adjust the position of the export in the stale list,
1993 * i.e. move to the head of the list if is needed.
1995 void obd_stale_export_adjust(struct obd_export *exp)
1997 LASSERT(exp != NULL);
1998 spin_lock_bh(&exp->exp_bl_list_lock);
1999 spin_lock(&obd_stale_export_lock);
2001 if (!list_empty(&exp->exp_stale_list) &&
2002 !list_empty(&exp->exp_bl_list))
2003 list_move(&exp->exp_stale_list, &obd_stale_exports);
2005 spin_unlock(&obd_stale_export_lock);
2006 spin_unlock_bh(&exp->exp_bl_list_lock);
2008 EXPORT_SYMBOL(obd_stale_export_adjust);
2011 * start destroy zombie import/export thread
2013 int obd_zombie_impexp_init(void)
2015 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
2023 * stop destroy zombie import/export thread
2025 void obd_zombie_impexp_stop(void)
2027 destroy_workqueue(zombie_wq);
2028 LASSERT(list_empty(&obd_stale_exports));
2031 /***** Kernel-userspace comm helpers *******/
2033 /* Get length of entire message, including header */
2034 int kuc_len(int payload_len)
2036 return sizeof(struct kuc_hdr) + payload_len;
2038 EXPORT_SYMBOL(kuc_len);
2040 /* Get a pointer to kuc header, given a ptr to the payload
2041 * @param p Pointer to payload area
2042 * @returns Pointer to kuc header
2044 struct kuc_hdr * kuc_ptr(void *p)
2046 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2047 LASSERT(lh->kuc_magic == KUC_MAGIC);
2050 EXPORT_SYMBOL(kuc_ptr);
2052 /* Alloc space for a message, and fill in header
2053 * @return Pointer to payload area
2055 void *kuc_alloc(int payload_len, int transport, int type)
2058 int len = kuc_len(payload_len);
2062 return ERR_PTR(-ENOMEM);
2064 lh->kuc_magic = KUC_MAGIC;
2065 lh->kuc_transport = transport;
2066 lh->kuc_msgtype = type;
2067 lh->kuc_msglen = len;
2069 return (void *)(lh + 1);
2071 EXPORT_SYMBOL(kuc_alloc);
2073 /* Takes pointer to payload area */
2074 void kuc_free(void *p, int payload_len)
2076 struct kuc_hdr *lh = kuc_ptr(p);
2077 OBD_FREE(lh, kuc_len(payload_len));
2079 EXPORT_SYMBOL(kuc_free);
2081 struct obd_request_slot_waiter {
2082 struct list_head orsw_entry;
2083 wait_queue_head_t orsw_waitq;
2087 static bool obd_request_slot_avail(struct client_obd *cli,
2088 struct obd_request_slot_waiter *orsw)
2092 spin_lock(&cli->cl_loi_list_lock);
2093 avail = !!list_empty(&orsw->orsw_entry);
2094 spin_unlock(&cli->cl_loi_list_lock);
2100 * For network flow control, the RPC sponsor needs to acquire a credit
2101 * before sending the RPC. The credits count for a connection is defined
2102 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2103 * the subsequent RPC sponsors need to wait until others released their
2104 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2106 int obd_get_request_slot(struct client_obd *cli)
2108 struct obd_request_slot_waiter orsw;
2109 struct l_wait_info lwi;
2112 spin_lock(&cli->cl_loi_list_lock);
2113 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2114 cli->cl_rpcs_in_flight++;
2115 spin_unlock(&cli->cl_loi_list_lock);
2119 init_waitqueue_head(&orsw.orsw_waitq);
2120 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2121 orsw.orsw_signaled = false;
2122 spin_unlock(&cli->cl_loi_list_lock);
2124 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2125 rc = l_wait_event(orsw.orsw_waitq,
2126 obd_request_slot_avail(cli, &orsw) ||
2130 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2131 * freed but other (such as obd_put_request_slot) is using it. */
2132 spin_lock(&cli->cl_loi_list_lock);
2134 if (!orsw.orsw_signaled) {
2135 if (list_empty(&orsw.orsw_entry))
2136 cli->cl_rpcs_in_flight--;
2138 list_del(&orsw.orsw_entry);
2142 if (orsw.orsw_signaled) {
2143 LASSERT(list_empty(&orsw.orsw_entry));
2147 spin_unlock(&cli->cl_loi_list_lock);
2151 EXPORT_SYMBOL(obd_get_request_slot);
2153 void obd_put_request_slot(struct client_obd *cli)
2155 struct obd_request_slot_waiter *orsw;
2157 spin_lock(&cli->cl_loi_list_lock);
2158 cli->cl_rpcs_in_flight--;
2160 /* If there is free slot, wakeup the first waiter. */
2161 if (!list_empty(&cli->cl_flight_waiters) &&
2162 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2163 orsw = list_entry(cli->cl_flight_waiters.next,
2164 struct obd_request_slot_waiter, orsw_entry);
2165 list_del_init(&orsw->orsw_entry);
2166 cli->cl_rpcs_in_flight++;
2167 wake_up(&orsw->orsw_waitq);
2169 spin_unlock(&cli->cl_loi_list_lock);
2171 EXPORT_SYMBOL(obd_put_request_slot);
2173 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2175 return cli->cl_max_rpcs_in_flight;
2177 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2179 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2181 struct obd_request_slot_waiter *orsw;
2188 if (max > OBD_MAX_RIF_MAX || max < 1)
2191 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2192 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2193 /* adjust max_mod_rpcs_in_flight to ensure it is always
2194 * strictly lower that max_rpcs_in_flight */
2196 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2197 "because it must be higher than "
2198 "max_mod_rpcs_in_flight value",
2199 cli->cl_import->imp_obd->obd_name);
2202 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2203 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2209 spin_lock(&cli->cl_loi_list_lock);
2210 old = cli->cl_max_rpcs_in_flight;
2211 cli->cl_max_rpcs_in_flight = max;
2212 client_adjust_max_dirty(cli);
2216 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2217 for (i = 0; i < diff; i++) {
2218 if (list_empty(&cli->cl_flight_waiters))
2221 orsw = list_entry(cli->cl_flight_waiters.next,
2222 struct obd_request_slot_waiter, orsw_entry);
2223 list_del_init(&orsw->orsw_entry);
2224 cli->cl_rpcs_in_flight++;
2225 wake_up(&orsw->orsw_waitq);
2227 spin_unlock(&cli->cl_loi_list_lock);
2231 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2233 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2235 return cli->cl_max_mod_rpcs_in_flight;
2237 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2239 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2241 struct obd_connect_data *ocd;
2245 if (max > OBD_MAX_RIF_MAX || max < 1)
2248 /* cannot exceed or equal max_rpcs_in_flight */
2249 if (max >= cli->cl_max_rpcs_in_flight) {
2250 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2251 "higher or equal to max_rpcs_in_flight value (%u)\n",
2252 cli->cl_import->imp_obd->obd_name,
2253 max, cli->cl_max_rpcs_in_flight);
2257 /* cannot exceed max modify RPCs in flight supported by the server */
2258 ocd = &cli->cl_import->imp_connect_data;
2259 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2260 maxmodrpcs = ocd->ocd_maxmodrpcs;
2263 if (max > maxmodrpcs) {
2264 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2265 "higher than max_mod_rpcs_per_client value (%hu) "
2266 "returned by the server at connection\n",
2267 cli->cl_import->imp_obd->obd_name,
2272 spin_lock(&cli->cl_mod_rpcs_lock);
2274 prev = cli->cl_max_mod_rpcs_in_flight;
2275 cli->cl_max_mod_rpcs_in_flight = max;
2277 /* wakeup waiters if limit has been increased */
2278 if (cli->cl_max_mod_rpcs_in_flight > prev)
2279 wake_up(&cli->cl_mod_rpcs_waitq);
2281 spin_unlock(&cli->cl_mod_rpcs_lock);
2285 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2288 #define pct(a, b) (b ? a * 100 / b : 0)
2289 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2290 struct seq_file *seq)
2292 unsigned long mod_tot = 0, mod_cum;
2293 struct timespec64 now;
2296 ktime_get_real_ts64(&now);
2298 spin_lock(&cli->cl_mod_rpcs_lock);
2300 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2301 (s64)now.tv_sec, now.tv_nsec);
2302 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2303 cli->cl_mod_rpcs_in_flight);
2305 seq_printf(seq, "\n\t\t\tmodify\n");
2306 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2308 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2311 for (i = 0; i < OBD_HIST_MAX; i++) {
2312 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2314 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2315 i, mod, pct(mod, mod_tot),
2316 pct(mod_cum, mod_tot));
2317 if (mod_cum == mod_tot)
2321 spin_unlock(&cli->cl_mod_rpcs_lock);
2325 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2329 /* The number of modify RPCs sent in parallel is limited
2330 * because the server has a finite number of slots per client to
2331 * store request result and ensure reply reconstruction when needed.
2332 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2333 * that takes into account server limit and cl_max_rpcs_in_flight
2335 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2336 * one close request is allowed above the maximum.
2338 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2343 /* A slot is available if
2344 * - number of modify RPCs in flight is less than the max
2345 * - it's a close RPC and no other close request is in flight
2347 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2348 (close_req && cli->cl_close_rpcs_in_flight == 0);
2353 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2358 spin_lock(&cli->cl_mod_rpcs_lock);
2359 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2360 spin_unlock(&cli->cl_mod_rpcs_lock);
2364 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2367 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2368 it->it_op == IT_READDIR ||
2369 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2374 /* Get a modify RPC slot from the obd client @cli according
2375 * to the kind of operation @opc that is going to be sent
2376 * and the intent @it of the operation if it applies.
2377 * If the maximum number of modify RPCs in flight is reached
2378 * the thread is put to sleep.
2379 * Returns the tag to be set in the request message. Tag 0
2380 * is reserved for non-modifying requests.
2382 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2383 struct lookup_intent *it)
2385 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2386 bool close_req = false;
2389 /* read-only metadata RPCs don't consume a slot on MDT
2390 * for reply reconstruction
2392 if (obd_skip_mod_rpc_slot(it))
2395 if (opc == MDS_CLOSE)
2399 spin_lock(&cli->cl_mod_rpcs_lock);
2400 max = cli->cl_max_mod_rpcs_in_flight;
2401 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2402 /* there is a slot available */
2403 cli->cl_mod_rpcs_in_flight++;
2405 cli->cl_close_rpcs_in_flight++;
2406 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2407 cli->cl_mod_rpcs_in_flight);
2408 /* find a free tag */
2409 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2411 LASSERT(i < OBD_MAX_RIF_MAX);
2412 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2413 spin_unlock(&cli->cl_mod_rpcs_lock);
2414 /* tag 0 is reserved for non-modify RPCs */
2417 spin_unlock(&cli->cl_mod_rpcs_lock);
2419 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2420 "opc %u, max %hu\n",
2421 cli->cl_import->imp_obd->obd_name, opc, max);
2423 l_wait_event(cli->cl_mod_rpcs_waitq,
2424 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2427 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2429 /* Put a modify RPC slot from the obd client @cli according
2430 * to the kind of operation @opc that has been sent and the
2431 * intent @it of the operation if it applies.
2433 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2434 struct lookup_intent *it, __u16 tag)
2436 bool close_req = false;
2438 if (obd_skip_mod_rpc_slot(it))
2441 if (opc == MDS_CLOSE)
2444 spin_lock(&cli->cl_mod_rpcs_lock);
2445 cli->cl_mod_rpcs_in_flight--;
2447 cli->cl_close_rpcs_in_flight--;
2448 /* release the tag in the bitmap */
2449 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2450 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2451 spin_unlock(&cli->cl_mod_rpcs_lock);
2452 wake_up(&cli->cl_mod_rpcs_waitq);
2454 EXPORT_SYMBOL(obd_put_mod_rpc_slot);