4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
54 static struct kmem_cache *obd_device_cachep;
55 struct kmem_cache *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 static struct kmem_cache *import_cachep;
59 static struct workqueue_struct *zombie_wq;
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64 const char *status, int locks, int debug_level);
66 static LIST_HEAD(obd_stale_exports);
67 static DEFINE_SPINLOCK(obd_stale_export_lock);
68 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
71 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
74 * support functions: we could use inter-module communication, but this
75 * is more portable to other OS's
77 static struct obd_device *obd_device_alloc(void)
79 struct obd_device *obd;
81 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
83 obd->obd_magic = OBD_DEVICE_MAGIC;
88 static void obd_device_free(struct obd_device *obd)
91 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
92 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
93 if (obd->obd_namespace != NULL) {
94 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
95 obd, obd->obd_namespace, obd->obd_force);
98 lu_ref_fini(&obd->obd_reference);
99 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
102 struct obd_type *class_search_type(const char *name)
104 struct list_head *tmp;
105 struct obd_type *type;
107 spin_lock(&obd_types_lock);
108 list_for_each(tmp, &obd_types) {
109 type = list_entry(tmp, struct obd_type, typ_chain);
110 if (strcmp(type->typ_name, name) == 0) {
111 spin_unlock(&obd_types_lock);
115 spin_unlock(&obd_types_lock);
118 EXPORT_SYMBOL(class_search_type);
120 struct obd_type *class_get_type(const char *name)
122 struct obd_type *type = class_search_type(name);
124 #ifdef HAVE_MODULE_LOADING_SUPPORT
126 const char *modname = name;
128 if (strcmp(modname, "obdfilter") == 0)
131 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
132 modname = LUSTRE_OSP_NAME;
134 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
135 modname = LUSTRE_MDT_NAME;
137 if (!request_module("%s", modname)) {
138 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
139 type = class_search_type(name);
141 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
147 spin_lock(&type->obd_type_lock);
149 try_module_get(type->typ_dt_ops->o_owner);
150 spin_unlock(&type->obd_type_lock);
155 void class_put_type(struct obd_type *type)
158 spin_lock(&type->obd_type_lock);
160 module_put(type->typ_dt_ops->o_owner);
161 spin_unlock(&type->obd_type_lock);
164 static void class_sysfs_release(struct kobject *kobj)
166 OBD_FREE(kobj, sizeof(*kobj));
169 static struct kobj_type class_ktype = {
170 .sysfs_ops = &lustre_sysfs_ops,
171 .release = class_sysfs_release,
174 struct kobject *class_setup_tunables(const char *name)
176 struct kobject *kobj;
179 #ifdef HAVE_SERVER_SUPPORT
180 kobj = kset_find_obj(lustre_kset, name);
184 OBD_ALLOC(kobj, sizeof(*kobj));
186 return ERR_PTR(-ENOMEM);
188 kobj->kset = lustre_kset;
189 kobject_init(kobj, &class_ktype);
190 rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
197 EXPORT_SYMBOL(class_setup_tunables);
199 #define CLASS_MAX_NAME 1024
201 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
202 bool enable_proc, struct lprocfs_vars *vars,
203 const char *name, struct lu_device_type *ldt)
205 struct obd_type *type;
206 #ifdef HAVE_SERVER_SUPPORT
208 #endif /* HAVE_SERVER_SUPPORT */
213 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
215 if (class_search_type(name)) {
216 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
221 OBD_ALLOC(type, sizeof(*type));
225 OBD_ALLOC_PTR(type->typ_dt_ops);
226 OBD_ALLOC_PTR(type->typ_md_ops);
227 OBD_ALLOC(type->typ_name, strlen(name) + 1);
229 if (type->typ_dt_ops == NULL ||
230 type->typ_md_ops == NULL ||
231 type->typ_name == NULL)
234 *(type->typ_dt_ops) = *dt_ops;
235 /* md_ops is optional */
237 *(type->typ_md_ops) = *md_ops;
238 strcpy(type->typ_name, name);
239 spin_lock_init(&type->obd_type_lock);
241 #ifdef CONFIG_PROC_FS
243 type->typ_procroot = lprocfs_register(type->typ_name,
246 if (IS_ERR(type->typ_procroot)) {
247 rc = PTR_ERR(type->typ_procroot);
248 type->typ_procroot = NULL;
253 #ifdef HAVE_SERVER_SUPPORT
255 dname.len = strlen(dname.name);
256 dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
258 type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
259 if (type->typ_debugfs_entry) {
260 dput(type->typ_debugfs_entry);
261 type->typ_sym_filter = true;
264 #endif /* HAVE_SERVER_SUPPORT */
266 type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
269 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
270 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
272 type->typ_debugfs_entry = NULL;
275 #ifdef HAVE_SERVER_SUPPORT
278 type->typ_kobj = class_setup_tunables(type->typ_name);
279 if (IS_ERR(type->typ_kobj))
280 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
284 rc = lu_device_type_init(ldt);
286 kobject_put(type->typ_kobj);
291 spin_lock(&obd_types_lock);
292 list_add(&type->typ_chain, &obd_types);
293 spin_unlock(&obd_types_lock);
298 #ifdef HAVE_SERVER_SUPPORT
299 if (type->typ_sym_filter)
300 type->typ_debugfs_entry = NULL;
302 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
303 ldebugfs_remove(&type->typ_debugfs_entry);
304 if (type->typ_name != NULL) {
305 #ifdef CONFIG_PROC_FS
306 if (type->typ_procroot != NULL)
307 remove_proc_subtree(type->typ_name, proc_lustre_root);
309 OBD_FREE(type->typ_name, strlen(name) + 1);
311 if (type->typ_md_ops != NULL)
312 OBD_FREE_PTR(type->typ_md_ops);
313 if (type->typ_dt_ops != NULL)
314 OBD_FREE_PTR(type->typ_dt_ops);
315 OBD_FREE(type, sizeof(*type));
318 EXPORT_SYMBOL(class_register_type);
320 int class_unregister_type(const char *name)
322 struct obd_type *type = class_search_type(name);
326 CERROR("unknown obd type\n");
330 if (type->typ_refcnt) {
331 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
332 /* This is a bad situation, let's make the best of it */
333 /* Remove ops, but leave the name for debugging */
334 OBD_FREE_PTR(type->typ_dt_ops);
335 OBD_FREE_PTR(type->typ_md_ops);
339 kobject_put(type->typ_kobj);
341 /* we do not use type->typ_procroot as for compatibility purposes
342 * other modules can share names (i.e. lod can use lov entry). so
343 * we can't reference pointer as it can get invalided when another
344 * module removes the entry */
345 #ifdef CONFIG_PROC_FS
346 if (type->typ_procroot != NULL)
347 remove_proc_subtree(type->typ_name, proc_lustre_root);
348 if (type->typ_procsym != NULL)
349 lprocfs_remove(&type->typ_procsym);
351 #ifdef HAVE_SERVER_SUPPORT
352 if (type->typ_sym_filter)
353 type->typ_debugfs_entry = NULL;
355 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
356 ldebugfs_remove(&type->typ_debugfs_entry);
359 lu_device_type_fini(type->typ_lu);
361 spin_lock(&obd_types_lock);
362 list_del(&type->typ_chain);
363 spin_unlock(&obd_types_lock);
364 OBD_FREE(type->typ_name, strlen(name) + 1);
365 if (type->typ_dt_ops != NULL)
366 OBD_FREE_PTR(type->typ_dt_ops);
367 if (type->typ_md_ops != NULL)
368 OBD_FREE_PTR(type->typ_md_ops);
369 OBD_FREE(type, sizeof(*type));
371 } /* class_unregister_type */
372 EXPORT_SYMBOL(class_unregister_type);
375 * Create a new obd device.
377 * Allocate the new obd_device and initialize it.
379 * \param[in] type_name obd device type string.
380 * \param[in] name obd device name.
381 * \param[in] uuid obd device UUID
383 * \retval newdev pointer to created obd_device
384 * \retval ERR_PTR(errno) on error
386 struct obd_device *class_newdev(const char *type_name, const char *name,
389 struct obd_device *newdev;
390 struct obd_type *type = NULL;
393 if (strlen(name) >= MAX_OBD_NAME) {
394 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
395 RETURN(ERR_PTR(-EINVAL));
398 type = class_get_type(type_name);
400 CERROR("OBD: unknown type: %s\n", type_name);
401 RETURN(ERR_PTR(-ENODEV));
404 newdev = obd_device_alloc();
405 if (newdev == NULL) {
406 class_put_type(type);
407 RETURN(ERR_PTR(-ENOMEM));
409 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
410 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
411 newdev->obd_type = type;
412 newdev->obd_minor = -1;
414 rwlock_init(&newdev->obd_pool_lock);
415 newdev->obd_pool_limit = 0;
416 newdev->obd_pool_slv = 0;
418 INIT_LIST_HEAD(&newdev->obd_exports);
419 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
420 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
421 INIT_LIST_HEAD(&newdev->obd_exports_timed);
422 INIT_LIST_HEAD(&newdev->obd_nid_stats);
423 spin_lock_init(&newdev->obd_nid_lock);
424 spin_lock_init(&newdev->obd_dev_lock);
425 mutex_init(&newdev->obd_dev_mutex);
426 spin_lock_init(&newdev->obd_osfs_lock);
427 /* newdev->obd_osfs_age must be set to a value in the distant
428 * past to guarantee a fresh statfs is fetched on mount. */
429 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
431 /* XXX belongs in setup not attach */
432 init_rwsem(&newdev->obd_observer_link_sem);
434 spin_lock_init(&newdev->obd_recovery_task_lock);
435 init_waitqueue_head(&newdev->obd_next_transno_waitq);
436 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
437 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
438 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
439 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
440 INIT_LIST_HEAD(&newdev->obd_evict_list);
441 INIT_LIST_HEAD(&newdev->obd_lwp_list);
443 llog_group_init(&newdev->obd_olg);
444 /* Detach drops this */
445 atomic_set(&newdev->obd_refcount, 1);
446 lu_ref_init(&newdev->obd_reference);
447 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
449 newdev->obd_conn_inprogress = 0;
451 strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
453 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
454 newdev->obd_name, newdev);
462 * \param[in] obd obd_device to be freed
466 void class_free_dev(struct obd_device *obd)
468 struct obd_type *obd_type = obd->obd_type;
470 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
471 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
472 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
473 "obd %p != obd_devs[%d] %p\n",
474 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
475 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
476 "obd_refcount should be 0, not %d\n",
477 atomic_read(&obd->obd_refcount));
478 LASSERT(obd_type != NULL);
480 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
481 obd->obd_name, obd->obd_type->typ_name);
483 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
484 obd->obd_name, obd->obd_uuid.uuid);
485 if (obd->obd_stopping) {
488 /* If we're not stopping, we were never set up */
489 err = obd_cleanup(obd);
491 CERROR("Cleanup %s returned %d\n",
495 obd_device_free(obd);
497 class_put_type(obd_type);
501 * Unregister obd device.
503 * Free slot in obd_dev[] used by \a obd.
505 * \param[in] new_obd obd_device to be unregistered
509 void class_unregister_device(struct obd_device *obd)
511 write_lock(&obd_dev_lock);
512 if (obd->obd_minor >= 0) {
513 LASSERT(obd_devs[obd->obd_minor] == obd);
514 obd_devs[obd->obd_minor] = NULL;
517 write_unlock(&obd_dev_lock);
521 * Register obd device.
523 * Find free slot in obd_devs[], fills it with \a new_obd.
525 * \param[in] new_obd obd_device to be registered
528 * \retval -EEXIST device with this name is registered
529 * \retval -EOVERFLOW obd_devs[] is full
531 int class_register_device(struct obd_device *new_obd)
535 int new_obd_minor = 0;
536 bool minor_assign = false;
537 bool retried = false;
540 write_lock(&obd_dev_lock);
541 for (i = 0; i < class_devno_max(); i++) {
542 struct obd_device *obd = class_num2obd(i);
545 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
548 write_unlock(&obd_dev_lock);
550 /* the obd_device could be waited to be
551 * destroyed by the "obd_zombie_impexp_thread".
553 obd_zombie_barrier();
558 CERROR("%s: already exists, won't add\n",
560 /* in case we found a free slot before duplicate */
561 minor_assign = false;
565 if (!minor_assign && obd == NULL) {
572 new_obd->obd_minor = new_obd_minor;
573 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
574 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
575 obd_devs[new_obd_minor] = new_obd;
579 CERROR("%s: all %u/%u devices used, increase "
580 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
581 i, class_devno_max(), ret);
584 write_unlock(&obd_dev_lock);
589 static int class_name2dev_nolock(const char *name)
596 for (i = 0; i < class_devno_max(); i++) {
597 struct obd_device *obd = class_num2obd(i);
599 if (obd && strcmp(name, obd->obd_name) == 0) {
600 /* Make sure we finished attaching before we give
601 out any references */
602 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
603 if (obd->obd_attached) {
613 int class_name2dev(const char *name)
620 read_lock(&obd_dev_lock);
621 i = class_name2dev_nolock(name);
622 read_unlock(&obd_dev_lock);
626 EXPORT_SYMBOL(class_name2dev);
628 struct obd_device *class_name2obd(const char *name)
630 int dev = class_name2dev(name);
632 if (dev < 0 || dev > class_devno_max())
634 return class_num2obd(dev);
636 EXPORT_SYMBOL(class_name2obd);
638 int class_uuid2dev_nolock(struct obd_uuid *uuid)
642 for (i = 0; i < class_devno_max(); i++) {
643 struct obd_device *obd = class_num2obd(i);
645 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
646 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
654 int class_uuid2dev(struct obd_uuid *uuid)
658 read_lock(&obd_dev_lock);
659 i = class_uuid2dev_nolock(uuid);
660 read_unlock(&obd_dev_lock);
664 EXPORT_SYMBOL(class_uuid2dev);
666 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
668 int dev = class_uuid2dev(uuid);
671 return class_num2obd(dev);
673 EXPORT_SYMBOL(class_uuid2obd);
676 * Get obd device from ::obd_devs[]
678 * \param num [in] array index
680 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
681 * otherwise return the obd device there.
683 struct obd_device *class_num2obd(int num)
685 struct obd_device *obd = NULL;
687 if (num < class_devno_max()) {
692 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
693 "%p obd_magic %08x != %08x\n",
694 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
695 LASSERTF(obd->obd_minor == num,
696 "%p obd_minor %0d != %0d\n",
697 obd, obd->obd_minor, num);
704 * Find obd in obd_dev[] by name or uuid.
706 * Increment obd's refcount if found.
708 * \param[in] str obd name or uuid
710 * \retval NULL if not found
711 * \retval target pointer to found obd_device
713 struct obd_device *class_dev_by_str(const char *str)
715 struct obd_device *target = NULL;
716 struct obd_uuid tgtuuid;
719 obd_str2uuid(&tgtuuid, str);
721 read_lock(&obd_dev_lock);
722 rc = class_uuid2dev_nolock(&tgtuuid);
724 rc = class_name2dev_nolock(str);
727 target = class_num2obd(rc);
730 class_incref(target, "find", current);
731 read_unlock(&obd_dev_lock);
735 EXPORT_SYMBOL(class_dev_by_str);
738 * Get obd devices count. Device in any
740 * \retval obd device count
742 int get_devices_count(void)
744 int index, max_index = class_devno_max(), dev_count = 0;
746 read_lock(&obd_dev_lock);
747 for (index = 0; index <= max_index; index++) {
748 struct obd_device *obd = class_num2obd(index);
752 read_unlock(&obd_dev_lock);
756 EXPORT_SYMBOL(get_devices_count);
758 void class_obd_list(void)
763 read_lock(&obd_dev_lock);
764 for (i = 0; i < class_devno_max(); i++) {
765 struct obd_device *obd = class_num2obd(i);
769 if (obd->obd_stopping)
771 else if (obd->obd_set_up)
773 else if (obd->obd_attached)
777 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
778 i, status, obd->obd_type->typ_name,
779 obd->obd_name, obd->obd_uuid.uuid,
780 atomic_read(&obd->obd_refcount));
782 read_unlock(&obd_dev_lock);
786 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
787 specified, then only the client with that uuid is returned,
788 otherwise any client connected to the tgt is returned. */
789 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
790 const char * typ_name,
791 struct obd_uuid *grp_uuid)
795 read_lock(&obd_dev_lock);
796 for (i = 0; i < class_devno_max(); i++) {
797 struct obd_device *obd = class_num2obd(i);
801 if ((strncmp(obd->obd_type->typ_name, typ_name,
802 strlen(typ_name)) == 0)) {
803 if (obd_uuid_equals(tgt_uuid,
804 &obd->u.cli.cl_target_uuid) &&
805 ((grp_uuid)? obd_uuid_equals(grp_uuid,
806 &obd->obd_uuid) : 1)) {
807 read_unlock(&obd_dev_lock);
812 read_unlock(&obd_dev_lock);
816 EXPORT_SYMBOL(class_find_client_obd);
818 /* Iterate the obd_device list looking devices have grp_uuid. Start
819 searching at *next, and if a device is found, the next index to look
820 at is saved in *next. If next is NULL, then the first matching device
821 will always be returned. */
822 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
828 else if (*next >= 0 && *next < class_devno_max())
833 read_lock(&obd_dev_lock);
834 for (; i < class_devno_max(); i++) {
835 struct obd_device *obd = class_num2obd(i);
839 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
842 read_unlock(&obd_dev_lock);
846 read_unlock(&obd_dev_lock);
850 EXPORT_SYMBOL(class_devices_in_group);
853 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
854 * adjust sptlrpc settings accordingly.
856 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
858 struct obd_device *obd;
862 LASSERT(namelen > 0);
864 read_lock(&obd_dev_lock);
865 for (i = 0; i < class_devno_max(); i++) {
866 obd = class_num2obd(i);
868 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
871 /* only notify mdc, osc, osp, lwp, mdt, ost
872 * because only these have a -sptlrpc llog */
873 type = obd->obd_type->typ_name;
874 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
875 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
876 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
877 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
878 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
879 strcmp(type, LUSTRE_OST_NAME) != 0)
882 if (strncmp(obd->obd_name, fsname, namelen))
885 class_incref(obd, __FUNCTION__, obd);
886 read_unlock(&obd_dev_lock);
887 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
888 sizeof(KEY_SPTLRPC_CONF),
889 KEY_SPTLRPC_CONF, 0, NULL, NULL);
891 class_decref(obd, __FUNCTION__, obd);
892 read_lock(&obd_dev_lock);
894 read_unlock(&obd_dev_lock);
897 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
899 void obd_cleanup_caches(void)
902 if (obd_device_cachep) {
903 kmem_cache_destroy(obd_device_cachep);
904 obd_device_cachep = NULL;
907 kmem_cache_destroy(obdo_cachep);
911 kmem_cache_destroy(import_cachep);
912 import_cachep = NULL;
918 int obd_init_caches(void)
923 LASSERT(obd_device_cachep == NULL);
924 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
925 sizeof(struct obd_device),
927 if (!obd_device_cachep)
928 GOTO(out, rc = -ENOMEM);
930 LASSERT(obdo_cachep == NULL);
931 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
934 GOTO(out, rc = -ENOMEM);
936 LASSERT(import_cachep == NULL);
937 import_cachep = kmem_cache_create("ll_import_cache",
938 sizeof(struct obd_import),
941 GOTO(out, rc = -ENOMEM);
945 obd_cleanup_caches();
949 /* map connection to client */
950 struct obd_export *class_conn2export(struct lustre_handle *conn)
952 struct obd_export *export;
956 CDEBUG(D_CACHE, "looking for null handle\n");
960 if (conn->cookie == -1) { /* this means assign a new connection */
961 CDEBUG(D_CACHE, "want a new connection\n");
965 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
966 export = class_handle2object(conn->cookie, NULL);
969 EXPORT_SYMBOL(class_conn2export);
971 struct obd_device *class_exp2obd(struct obd_export *exp)
977 EXPORT_SYMBOL(class_exp2obd);
979 struct obd_device *class_conn2obd(struct lustre_handle *conn)
981 struct obd_export *export;
982 export = class_conn2export(conn);
984 struct obd_device *obd = export->exp_obd;
985 class_export_put(export);
991 struct obd_import *class_exp2cliimp(struct obd_export *exp)
993 struct obd_device *obd = exp->exp_obd;
996 return obd->u.cli.cl_import;
998 EXPORT_SYMBOL(class_exp2cliimp);
1000 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
1002 struct obd_device *obd = class_conn2obd(conn);
1005 return obd->u.cli.cl_import;
1008 /* Export management functions */
1009 static void class_export_destroy(struct obd_export *exp)
1011 struct obd_device *obd = exp->exp_obd;
1014 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
1015 LASSERT(obd != NULL);
1017 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
1018 exp->exp_client_uuid.uuid, obd->obd_name);
1020 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
1021 if (exp->exp_connection)
1022 ptlrpc_put_connection_superhack(exp->exp_connection);
1024 LASSERT(list_empty(&exp->exp_outstanding_replies));
1025 LASSERT(list_empty(&exp->exp_uncommitted_replies));
1026 LASSERT(list_empty(&exp->exp_req_replay_queue));
1027 LASSERT(list_empty(&exp->exp_hp_rpcs));
1028 obd_destroy_export(exp);
1029 /* self export doesn't hold a reference to an obd, although it
1030 * exists until freeing of the obd */
1031 if (exp != obd->obd_self_export)
1032 class_decref(obd, "export", exp);
1034 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1038 static void export_handle_addref(void *export)
1040 class_export_get(export);
1043 static struct portals_handle_ops export_handle_ops = {
1044 .hop_addref = export_handle_addref,
1048 struct obd_export *class_export_get(struct obd_export *exp)
1050 atomic_inc(&exp->exp_refcount);
1051 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1052 atomic_read(&exp->exp_refcount));
1055 EXPORT_SYMBOL(class_export_get);
1057 void class_export_put(struct obd_export *exp)
1059 LASSERT(exp != NULL);
1060 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1061 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1062 atomic_read(&exp->exp_refcount) - 1);
1064 if (atomic_dec_and_test(&exp->exp_refcount)) {
1065 struct obd_device *obd = exp->exp_obd;
1067 CDEBUG(D_IOCTL, "final put %p/%s\n",
1068 exp, exp->exp_client_uuid.uuid);
1070 /* release nid stat refererence */
1071 lprocfs_exp_cleanup(exp);
1073 if (exp == obd->obd_self_export) {
1074 /* self export should be destroyed without
1075 * zombie thread as it doesn't hold a
1076 * reference to obd and doesn't hold any
1078 class_export_destroy(exp);
1079 /* self export is destroyed, no class
1080 * references exist and it is safe to free
1082 class_free_dev(obd);
1084 LASSERT(!list_empty(&exp->exp_obd_chain));
1085 obd_zombie_export_add(exp);
1090 EXPORT_SYMBOL(class_export_put);
1092 static void obd_zombie_exp_cull(struct work_struct *ws)
1094 struct obd_export *export;
1096 export = container_of(ws, struct obd_export, exp_zombie_work);
1097 class_export_destroy(export);
1100 /* Creates a new export, adds it to the hash table, and returns a
1101 * pointer to it. The refcount is 2: one for the hash reference, and
1102 * one for the pointer returned by this function. */
1103 struct obd_export *__class_new_export(struct obd_device *obd,
1104 struct obd_uuid *cluuid, bool is_self)
1106 struct obd_export *export;
1107 struct cfs_hash *hash = NULL;
1111 OBD_ALLOC_PTR(export);
1113 return ERR_PTR(-ENOMEM);
1115 export->exp_conn_cnt = 0;
1116 export->exp_lock_hash = NULL;
1117 export->exp_flock_hash = NULL;
1118 /* 2 = class_handle_hash + last */
1119 atomic_set(&export->exp_refcount, 2);
1120 atomic_set(&export->exp_rpc_count, 0);
1121 atomic_set(&export->exp_cb_count, 0);
1122 atomic_set(&export->exp_locks_count, 0);
1123 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1124 INIT_LIST_HEAD(&export->exp_locks_list);
1125 spin_lock_init(&export->exp_locks_list_guard);
1127 atomic_set(&export->exp_replay_count, 0);
1128 export->exp_obd = obd;
1129 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1130 spin_lock_init(&export->exp_uncommitted_replies_lock);
1131 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1132 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1133 INIT_LIST_HEAD(&export->exp_handle.h_link);
1134 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1135 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1136 class_handle_hash(&export->exp_handle, &export_handle_ops);
1137 export->exp_last_request_time = ktime_get_real_seconds();
1138 spin_lock_init(&export->exp_lock);
1139 spin_lock_init(&export->exp_rpc_lock);
1140 INIT_HLIST_NODE(&export->exp_uuid_hash);
1141 INIT_HLIST_NODE(&export->exp_nid_hash);
1142 INIT_HLIST_NODE(&export->exp_gen_hash);
1143 spin_lock_init(&export->exp_bl_list_lock);
1144 INIT_LIST_HEAD(&export->exp_bl_list);
1145 INIT_LIST_HEAD(&export->exp_stale_list);
1146 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1148 export->exp_sp_peer = LUSTRE_SP_ANY;
1149 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1150 export->exp_client_uuid = *cluuid;
1151 obd_init_export(export);
1153 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1154 spin_lock(&obd->obd_dev_lock);
1155 /* shouldn't happen, but might race */
1156 if (obd->obd_stopping)
1157 GOTO(exit_unlock, rc = -ENODEV);
1159 hash = cfs_hash_getref(obd->obd_uuid_hash);
1161 GOTO(exit_unlock, rc = -ENODEV);
1162 spin_unlock(&obd->obd_dev_lock);
1164 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1166 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1167 obd->obd_name, cluuid->uuid, rc);
1168 GOTO(exit_err, rc = -EALREADY);
1172 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1173 spin_lock(&obd->obd_dev_lock);
1174 if (obd->obd_stopping) {
1176 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1177 GOTO(exit_unlock, rc = -ESHUTDOWN);
1181 class_incref(obd, "export", export);
1182 list_add_tail(&export->exp_obd_chain_timed,
1183 &obd->obd_exports_timed);
1184 list_add(&export->exp_obd_chain, &obd->obd_exports);
1185 obd->obd_num_exports++;
1187 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1188 INIT_LIST_HEAD(&export->exp_obd_chain);
1190 spin_unlock(&obd->obd_dev_lock);
1192 cfs_hash_putref(hash);
1196 spin_unlock(&obd->obd_dev_lock);
1199 cfs_hash_putref(hash);
1200 class_handle_unhash(&export->exp_handle);
1201 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1202 obd_destroy_export(export);
1203 OBD_FREE_PTR(export);
1207 struct obd_export *class_new_export(struct obd_device *obd,
1208 struct obd_uuid *uuid)
1210 return __class_new_export(obd, uuid, false);
1212 EXPORT_SYMBOL(class_new_export);
1214 struct obd_export *class_new_export_self(struct obd_device *obd,
1215 struct obd_uuid *uuid)
1217 return __class_new_export(obd, uuid, true);
1220 void class_unlink_export(struct obd_export *exp)
1222 class_handle_unhash(&exp->exp_handle);
1224 if (exp->exp_obd->obd_self_export == exp) {
1225 class_export_put(exp);
1229 spin_lock(&exp->exp_obd->obd_dev_lock);
1230 /* delete an uuid-export hashitem from hashtables */
1231 if (!hlist_unhashed(&exp->exp_uuid_hash))
1232 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1233 &exp->exp_client_uuid,
1234 &exp->exp_uuid_hash);
1236 #ifdef HAVE_SERVER_SUPPORT
1237 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1238 struct tg_export_data *ted = &exp->exp_target_data;
1239 struct cfs_hash *hash;
1241 /* Because obd_gen_hash will not be released until
1242 * class_cleanup(), so hash should never be NULL here */
1243 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1244 LASSERT(hash != NULL);
1245 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1246 &exp->exp_gen_hash);
1247 cfs_hash_putref(hash);
1249 #endif /* HAVE_SERVER_SUPPORT */
1251 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1252 list_del_init(&exp->exp_obd_chain_timed);
1253 exp->exp_obd->obd_num_exports--;
1254 spin_unlock(&exp->exp_obd->obd_dev_lock);
1255 atomic_inc(&obd_stale_export_num);
1257 /* A reference is kept by obd_stale_exports list */
1258 obd_stale_export_put(exp);
1260 EXPORT_SYMBOL(class_unlink_export);
1262 /* Import management functions */
1263 static void class_import_destroy(struct obd_import *imp)
1267 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1268 imp->imp_obd->obd_name);
1270 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1272 ptlrpc_put_connection_superhack(imp->imp_connection);
1274 while (!list_empty(&imp->imp_conn_list)) {
1275 struct obd_import_conn *imp_conn;
1277 imp_conn = list_entry(imp->imp_conn_list.next,
1278 struct obd_import_conn, oic_item);
1279 list_del_init(&imp_conn->oic_item);
1280 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1281 OBD_FREE(imp_conn, sizeof(*imp_conn));
1284 LASSERT(imp->imp_sec == NULL);
1285 class_decref(imp->imp_obd, "import", imp);
1286 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1290 static void import_handle_addref(void *import)
1292 class_import_get(import);
1295 static struct portals_handle_ops import_handle_ops = {
1296 .hop_addref = import_handle_addref,
1300 struct obd_import *class_import_get(struct obd_import *import)
1302 atomic_inc(&import->imp_refcount);
1303 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1304 atomic_read(&import->imp_refcount),
1305 import->imp_obd->obd_name);
1308 EXPORT_SYMBOL(class_import_get);
1310 void class_import_put(struct obd_import *imp)
1314 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1316 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1317 atomic_read(&imp->imp_refcount) - 1,
1318 imp->imp_obd->obd_name);
1320 if (atomic_dec_and_test(&imp->imp_refcount)) {
1321 CDEBUG(D_INFO, "final put import %p\n", imp);
1322 obd_zombie_import_add(imp);
1325 /* catch possible import put race */
1326 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1329 EXPORT_SYMBOL(class_import_put);
1331 static void init_imp_at(struct imp_at *at) {
1333 at_init(&at->iat_net_latency, 0, 0);
1334 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1335 /* max service estimates are tracked on the server side, so
1336 don't use the AT history here, just use the last reported
1337 val. (But keep hist for proc histogram, worst_ever) */
1338 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1343 static void obd_zombie_imp_cull(struct work_struct *ws)
1345 struct obd_import *import;
1347 import = container_of(ws, struct obd_import, imp_zombie_work);
1348 class_import_destroy(import);
1351 struct obd_import *class_new_import(struct obd_device *obd)
1353 struct obd_import *imp;
1354 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1356 OBD_ALLOC(imp, sizeof(*imp));
1360 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1361 INIT_LIST_HEAD(&imp->imp_replay_list);
1362 INIT_LIST_HEAD(&imp->imp_sending_list);
1363 INIT_LIST_HEAD(&imp->imp_delayed_list);
1364 INIT_LIST_HEAD(&imp->imp_committed_list);
1365 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1366 imp->imp_known_replied_xid = 0;
1367 imp->imp_replay_cursor = &imp->imp_committed_list;
1368 spin_lock_init(&imp->imp_lock);
1369 imp->imp_last_success_conn = 0;
1370 imp->imp_state = LUSTRE_IMP_NEW;
1371 imp->imp_obd = class_incref(obd, "import", imp);
1372 mutex_init(&imp->imp_sec_mutex);
1373 init_waitqueue_head(&imp->imp_recovery_waitq);
1374 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1376 if (curr_pid_ns->child_reaper)
1377 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1379 imp->imp_sec_refpid = 1;
1381 atomic_set(&imp->imp_refcount, 2);
1382 atomic_set(&imp->imp_unregistering, 0);
1383 atomic_set(&imp->imp_inflight, 0);
1384 atomic_set(&imp->imp_replay_inflight, 0);
1385 atomic_set(&imp->imp_inval_count, 0);
1386 INIT_LIST_HEAD(&imp->imp_conn_list);
1387 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1388 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1389 init_imp_at(&imp->imp_at);
1391 /* the default magic is V2, will be used in connect RPC, and
1392 * then adjusted according to the flags in request/reply. */
1393 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1397 EXPORT_SYMBOL(class_new_import);
1399 void class_destroy_import(struct obd_import *import)
1401 LASSERT(import != NULL);
1402 LASSERT(import != LP_POISON);
1404 class_handle_unhash(&import->imp_handle);
1406 spin_lock(&import->imp_lock);
1407 import->imp_generation++;
1408 spin_unlock(&import->imp_lock);
1409 class_import_put(import);
1411 EXPORT_SYMBOL(class_destroy_import);
1413 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1415 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1417 spin_lock(&exp->exp_locks_list_guard);
1419 LASSERT(lock->l_exp_refs_nr >= 0);
1421 if (lock->l_exp_refs_target != NULL &&
1422 lock->l_exp_refs_target != exp) {
1423 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1424 exp, lock, lock->l_exp_refs_target);
1426 if ((lock->l_exp_refs_nr ++) == 0) {
1427 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1428 lock->l_exp_refs_target = exp;
1430 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1431 lock, exp, lock->l_exp_refs_nr);
1432 spin_unlock(&exp->exp_locks_list_guard);
1434 EXPORT_SYMBOL(__class_export_add_lock_ref);
1436 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1438 spin_lock(&exp->exp_locks_list_guard);
1439 LASSERT(lock->l_exp_refs_nr > 0);
1440 if (lock->l_exp_refs_target != exp) {
1441 LCONSOLE_WARN("lock %p, "
1442 "mismatching export pointers: %p, %p\n",
1443 lock, lock->l_exp_refs_target, exp);
1445 if (-- lock->l_exp_refs_nr == 0) {
1446 list_del_init(&lock->l_exp_refs_link);
1447 lock->l_exp_refs_target = NULL;
1449 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1450 lock, exp, lock->l_exp_refs_nr);
1451 spin_unlock(&exp->exp_locks_list_guard);
1453 EXPORT_SYMBOL(__class_export_del_lock_ref);
1456 /* A connection defines an export context in which preallocation can
1457 be managed. This releases the export pointer reference, and returns
1458 the export handle, so the export refcount is 1 when this function
1460 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1461 struct obd_uuid *cluuid)
1463 struct obd_export *export;
1464 LASSERT(conn != NULL);
1465 LASSERT(obd != NULL);
1466 LASSERT(cluuid != NULL);
1469 export = class_new_export(obd, cluuid);
1471 RETURN(PTR_ERR(export));
1473 conn->cookie = export->exp_handle.h_cookie;
1474 class_export_put(export);
1476 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1477 cluuid->uuid, conn->cookie);
1480 EXPORT_SYMBOL(class_connect);
1482 /* if export is involved in recovery then clean up related things */
1483 static void class_export_recovery_cleanup(struct obd_export *exp)
1485 struct obd_device *obd = exp->exp_obd;
1487 spin_lock(&obd->obd_recovery_task_lock);
1488 if (obd->obd_recovering) {
1489 if (exp->exp_in_recovery) {
1490 spin_lock(&exp->exp_lock);
1491 exp->exp_in_recovery = 0;
1492 spin_unlock(&exp->exp_lock);
1493 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1494 atomic_dec(&obd->obd_connected_clients);
1497 /* if called during recovery then should update
1498 * obd_stale_clients counter,
1499 * lightweight exports are not counted */
1500 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1501 exp->exp_obd->obd_stale_clients++;
1503 spin_unlock(&obd->obd_recovery_task_lock);
1505 spin_lock(&exp->exp_lock);
1506 /** Cleanup req replay fields */
1507 if (exp->exp_req_replay_needed) {
1508 exp->exp_req_replay_needed = 0;
1510 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1511 atomic_dec(&obd->obd_req_replay_clients);
1514 /** Cleanup lock replay data */
1515 if (exp->exp_lock_replay_needed) {
1516 exp->exp_lock_replay_needed = 0;
1518 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1519 atomic_dec(&obd->obd_lock_replay_clients);
1521 spin_unlock(&exp->exp_lock);
1524 /* This function removes 1-3 references from the export:
1525 * 1 - for export pointer passed
1526 * and if disconnect really need
1527 * 2 - removing from hash
1528 * 3 - in client_unlink_export
1529 * The export pointer passed to this function can destroyed */
1530 int class_disconnect(struct obd_export *export)
1532 int already_disconnected;
1535 if (export == NULL) {
1536 CWARN("attempting to free NULL export %p\n", export);
1540 spin_lock(&export->exp_lock);
1541 already_disconnected = export->exp_disconnected;
1542 export->exp_disconnected = 1;
1543 /* We hold references of export for uuid hash
1544 * and nid_hash and export link at least. So
1545 * it is safe to call cfs_hash_del in there. */
1546 if (!hlist_unhashed(&export->exp_nid_hash))
1547 cfs_hash_del(export->exp_obd->obd_nid_hash,
1548 &export->exp_connection->c_peer.nid,
1549 &export->exp_nid_hash);
1550 spin_unlock(&export->exp_lock);
1552 /* class_cleanup(), abort_recovery(), and class_fail_export()
1553 * all end up in here, and if any of them race we shouldn't
1554 * call extra class_export_puts(). */
1555 if (already_disconnected) {
1556 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1557 GOTO(no_disconn, already_disconnected);
1560 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1561 export->exp_handle.h_cookie);
1563 class_export_recovery_cleanup(export);
1564 class_unlink_export(export);
1566 class_export_put(export);
1569 EXPORT_SYMBOL(class_disconnect);
1571 /* Return non-zero for a fully connected export */
1572 int class_connected_export(struct obd_export *exp)
1577 spin_lock(&exp->exp_lock);
1578 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1579 spin_unlock(&exp->exp_lock);
1583 EXPORT_SYMBOL(class_connected_export);
1585 static void class_disconnect_export_list(struct list_head *list,
1586 enum obd_option flags)
1589 struct obd_export *exp;
1592 /* It's possible that an export may disconnect itself, but
1593 * nothing else will be added to this list. */
1594 while (!list_empty(list)) {
1595 exp = list_entry(list->next, struct obd_export,
1597 /* need for safe call CDEBUG after obd_disconnect */
1598 class_export_get(exp);
1600 spin_lock(&exp->exp_lock);
1601 exp->exp_flags = flags;
1602 spin_unlock(&exp->exp_lock);
1604 if (obd_uuid_equals(&exp->exp_client_uuid,
1605 &exp->exp_obd->obd_uuid)) {
1607 "exp %p export uuid == obd uuid, don't discon\n",
1609 /* Need to delete this now so we don't end up pointing
1610 * to work_list later when this export is cleaned up. */
1611 list_del_init(&exp->exp_obd_chain);
1612 class_export_put(exp);
1616 class_export_get(exp);
1617 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1618 "last request at %lld\n",
1619 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1620 exp, exp->exp_last_request_time);
1621 /* release one export reference anyway */
1622 rc = obd_disconnect(exp);
1624 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1625 obd_export_nid2str(exp), exp, rc);
1626 class_export_put(exp);
1631 void class_disconnect_exports(struct obd_device *obd)
1633 struct list_head work_list;
1636 /* Move all of the exports from obd_exports to a work list, en masse. */
1637 INIT_LIST_HEAD(&work_list);
1638 spin_lock(&obd->obd_dev_lock);
1639 list_splice_init(&obd->obd_exports, &work_list);
1640 list_splice_init(&obd->obd_delayed_exports, &work_list);
1641 spin_unlock(&obd->obd_dev_lock);
1643 if (!list_empty(&work_list)) {
1644 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1645 "disconnecting them\n", obd->obd_minor, obd);
1646 class_disconnect_export_list(&work_list,
1647 exp_flags_from_obd(obd));
1649 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1650 obd->obd_minor, obd);
1653 EXPORT_SYMBOL(class_disconnect_exports);
1655 /* Remove exports that have not completed recovery.
1657 void class_disconnect_stale_exports(struct obd_device *obd,
1658 int (*test_export)(struct obd_export *))
1660 struct list_head work_list;
1661 struct obd_export *exp, *n;
1665 INIT_LIST_HEAD(&work_list);
1666 spin_lock(&obd->obd_dev_lock);
1667 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1669 /* don't count self-export as client */
1670 if (obd_uuid_equals(&exp->exp_client_uuid,
1671 &exp->exp_obd->obd_uuid))
1674 /* don't evict clients which have no slot in last_rcvd
1675 * (e.g. lightweight connection) */
1676 if (exp->exp_target_data.ted_lr_idx == -1)
1679 spin_lock(&exp->exp_lock);
1680 if (exp->exp_failed || test_export(exp)) {
1681 spin_unlock(&exp->exp_lock);
1684 exp->exp_failed = 1;
1685 spin_unlock(&exp->exp_lock);
1687 list_move(&exp->exp_obd_chain, &work_list);
1689 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1690 obd->obd_name, exp->exp_client_uuid.uuid,
1691 obd_export_nid2str(exp));
1692 print_export_data(exp, "EVICTING", 0, D_HA);
1694 spin_unlock(&obd->obd_dev_lock);
1697 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1698 obd->obd_name, evicted);
1700 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1701 OBD_OPT_ABORT_RECOV);
1704 EXPORT_SYMBOL(class_disconnect_stale_exports);
1706 void class_fail_export(struct obd_export *exp)
1708 int rc, already_failed;
1710 spin_lock(&exp->exp_lock);
1711 already_failed = exp->exp_failed;
1712 exp->exp_failed = 1;
1713 spin_unlock(&exp->exp_lock);
1715 if (already_failed) {
1716 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1717 exp, exp->exp_client_uuid.uuid);
1721 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1722 exp, exp->exp_client_uuid.uuid);
1724 if (obd_dump_on_timeout)
1725 libcfs_debug_dumplog();
1727 /* need for safe call CDEBUG after obd_disconnect */
1728 class_export_get(exp);
1730 /* Most callers into obd_disconnect are removing their own reference
1731 * (request, for example) in addition to the one from the hash table.
1732 * We don't have such a reference here, so make one. */
1733 class_export_get(exp);
1734 rc = obd_disconnect(exp);
1736 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1738 CDEBUG(D_HA, "disconnected export %p/%s\n",
1739 exp, exp->exp_client_uuid.uuid);
1740 class_export_put(exp);
1742 EXPORT_SYMBOL(class_fail_export);
1744 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1746 struct cfs_hash *nid_hash;
1747 struct obd_export *doomed_exp = NULL;
1748 int exports_evicted = 0;
1750 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1752 spin_lock(&obd->obd_dev_lock);
1753 /* umount has run already, so evict thread should leave
1754 * its task to umount thread now */
1755 if (obd->obd_stopping) {
1756 spin_unlock(&obd->obd_dev_lock);
1757 return exports_evicted;
1759 nid_hash = obd->obd_nid_hash;
1760 cfs_hash_getref(nid_hash);
1761 spin_unlock(&obd->obd_dev_lock);
1764 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1765 if (doomed_exp == NULL)
1768 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1769 "nid %s found, wanted nid %s, requested nid %s\n",
1770 obd_export_nid2str(doomed_exp),
1771 libcfs_nid2str(nid_key), nid);
1772 LASSERTF(doomed_exp != obd->obd_self_export,
1773 "self-export is hashed by NID?\n");
1775 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1776 "request\n", obd->obd_name,
1777 obd_uuid2str(&doomed_exp->exp_client_uuid),
1778 obd_export_nid2str(doomed_exp));
1779 class_fail_export(doomed_exp);
1780 class_export_put(doomed_exp);
1783 cfs_hash_putref(nid_hash);
1785 if (!exports_evicted)
1786 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1787 obd->obd_name, nid);
1788 return exports_evicted;
1790 EXPORT_SYMBOL(obd_export_evict_by_nid);
1792 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1794 struct cfs_hash *uuid_hash;
1795 struct obd_export *doomed_exp = NULL;
1796 struct obd_uuid doomed_uuid;
1797 int exports_evicted = 0;
1799 spin_lock(&obd->obd_dev_lock);
1800 if (obd->obd_stopping) {
1801 spin_unlock(&obd->obd_dev_lock);
1802 return exports_evicted;
1804 uuid_hash = obd->obd_uuid_hash;
1805 cfs_hash_getref(uuid_hash);
1806 spin_unlock(&obd->obd_dev_lock);
1808 obd_str2uuid(&doomed_uuid, uuid);
1809 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1810 CERROR("%s: can't evict myself\n", obd->obd_name);
1811 cfs_hash_putref(uuid_hash);
1812 return exports_evicted;
1815 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1817 if (doomed_exp == NULL) {
1818 CERROR("%s: can't disconnect %s: no exports found\n",
1819 obd->obd_name, uuid);
1821 CWARN("%s: evicting %s at adminstrative request\n",
1822 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1823 class_fail_export(doomed_exp);
1824 class_export_put(doomed_exp);
1827 cfs_hash_putref(uuid_hash);
1829 return exports_evicted;
1832 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1833 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1834 EXPORT_SYMBOL(class_export_dump_hook);
1837 static void print_export_data(struct obd_export *exp, const char *status,
1838 int locks, int debug_level)
1840 struct ptlrpc_reply_state *rs;
1841 struct ptlrpc_reply_state *first_reply = NULL;
1844 spin_lock(&exp->exp_lock);
1845 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1851 spin_unlock(&exp->exp_lock);
1853 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1854 "%p %s %llu stale:%d\n",
1855 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1856 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1857 atomic_read(&exp->exp_rpc_count),
1858 atomic_read(&exp->exp_cb_count),
1859 atomic_read(&exp->exp_locks_count),
1860 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1861 nreplies, first_reply, nreplies > 3 ? "..." : "",
1862 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1863 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1864 if (locks && class_export_dump_hook != NULL)
1865 class_export_dump_hook(exp);
1869 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1871 struct obd_export *exp;
1873 spin_lock(&obd->obd_dev_lock);
1874 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1875 print_export_data(exp, "ACTIVE", locks, debug_level);
1876 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1877 print_export_data(exp, "UNLINKED", locks, debug_level);
1878 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1879 print_export_data(exp, "DELAYED", locks, debug_level);
1880 spin_unlock(&obd->obd_dev_lock);
1883 void obd_exports_barrier(struct obd_device *obd)
1886 LASSERT(list_empty(&obd->obd_exports));
1887 spin_lock(&obd->obd_dev_lock);
1888 while (!list_empty(&obd->obd_unlinked_exports)) {
1889 spin_unlock(&obd->obd_dev_lock);
1890 set_current_state(TASK_UNINTERRUPTIBLE);
1891 schedule_timeout(cfs_time_seconds(waited));
1892 if (waited > 5 && is_power_of_2(waited)) {
1893 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1894 "more than %d seconds. "
1895 "The obd refcount = %d. Is it stuck?\n",
1896 obd->obd_name, waited,
1897 atomic_read(&obd->obd_refcount));
1898 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1901 spin_lock(&obd->obd_dev_lock);
1903 spin_unlock(&obd->obd_dev_lock);
1905 EXPORT_SYMBOL(obd_exports_barrier);
1908 * Add export to the obd_zombe thread and notify it.
1910 static void obd_zombie_export_add(struct obd_export *exp) {
1911 atomic_dec(&obd_stale_export_num);
1912 spin_lock(&exp->exp_obd->obd_dev_lock);
1913 LASSERT(!list_empty(&exp->exp_obd_chain));
1914 list_del_init(&exp->exp_obd_chain);
1915 spin_unlock(&exp->exp_obd->obd_dev_lock);
1917 queue_work(zombie_wq, &exp->exp_zombie_work);
1921 * Add import to the obd_zombe thread and notify it.
1923 static void obd_zombie_import_add(struct obd_import *imp) {
1924 LASSERT(imp->imp_sec == NULL);
1926 queue_work(zombie_wq, &imp->imp_zombie_work);
1930 * wait when obd_zombie import/export queues become empty
1932 void obd_zombie_barrier(void)
1934 flush_workqueue(zombie_wq);
1936 EXPORT_SYMBOL(obd_zombie_barrier);
1939 struct obd_export *obd_stale_export_get(void)
1941 struct obd_export *exp = NULL;
1944 spin_lock(&obd_stale_export_lock);
1945 if (!list_empty(&obd_stale_exports)) {
1946 exp = list_entry(obd_stale_exports.next,
1947 struct obd_export, exp_stale_list);
1948 list_del_init(&exp->exp_stale_list);
1950 spin_unlock(&obd_stale_export_lock);
1953 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1954 atomic_read(&obd_stale_export_num));
1958 EXPORT_SYMBOL(obd_stale_export_get);
1960 void obd_stale_export_put(struct obd_export *exp)
1964 LASSERT(list_empty(&exp->exp_stale_list));
1965 if (exp->exp_lock_hash &&
1966 atomic_read(&exp->exp_lock_hash->hs_count)) {
1967 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1968 atomic_read(&obd_stale_export_num));
1970 spin_lock_bh(&exp->exp_bl_list_lock);
1971 spin_lock(&obd_stale_export_lock);
1972 /* Add to the tail if there is no blocked locks,
1973 * to the head otherwise. */
1974 if (list_empty(&exp->exp_bl_list))
1975 list_add_tail(&exp->exp_stale_list,
1976 &obd_stale_exports);
1978 list_add(&exp->exp_stale_list,
1979 &obd_stale_exports);
1981 spin_unlock(&obd_stale_export_lock);
1982 spin_unlock_bh(&exp->exp_bl_list_lock);
1984 class_export_put(exp);
1988 EXPORT_SYMBOL(obd_stale_export_put);
1991 * Adjust the position of the export in the stale list,
1992 * i.e. move to the head of the list if is needed.
1994 void obd_stale_export_adjust(struct obd_export *exp)
1996 LASSERT(exp != NULL);
1997 spin_lock_bh(&exp->exp_bl_list_lock);
1998 spin_lock(&obd_stale_export_lock);
2000 if (!list_empty(&exp->exp_stale_list) &&
2001 !list_empty(&exp->exp_bl_list))
2002 list_move(&exp->exp_stale_list, &obd_stale_exports);
2004 spin_unlock(&obd_stale_export_lock);
2005 spin_unlock_bh(&exp->exp_bl_list_lock);
2007 EXPORT_SYMBOL(obd_stale_export_adjust);
2010 * start destroy zombie import/export thread
2012 int obd_zombie_impexp_init(void)
2014 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
2022 * stop destroy zombie import/export thread
2024 void obd_zombie_impexp_stop(void)
2026 destroy_workqueue(zombie_wq);
2027 LASSERT(list_empty(&obd_stale_exports));
2030 /***** Kernel-userspace comm helpers *******/
2032 /* Get length of entire message, including header */
2033 int kuc_len(int payload_len)
2035 return sizeof(struct kuc_hdr) + payload_len;
2037 EXPORT_SYMBOL(kuc_len);
2039 /* Get a pointer to kuc header, given a ptr to the payload
2040 * @param p Pointer to payload area
2041 * @returns Pointer to kuc header
2043 struct kuc_hdr * kuc_ptr(void *p)
2045 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2046 LASSERT(lh->kuc_magic == KUC_MAGIC);
2049 EXPORT_SYMBOL(kuc_ptr);
2051 /* Alloc space for a message, and fill in header
2052 * @return Pointer to payload area
2054 void *kuc_alloc(int payload_len, int transport, int type)
2057 int len = kuc_len(payload_len);
2061 return ERR_PTR(-ENOMEM);
2063 lh->kuc_magic = KUC_MAGIC;
2064 lh->kuc_transport = transport;
2065 lh->kuc_msgtype = type;
2066 lh->kuc_msglen = len;
2068 return (void *)(lh + 1);
2070 EXPORT_SYMBOL(kuc_alloc);
2072 /* Takes pointer to payload area */
2073 void kuc_free(void *p, int payload_len)
2075 struct kuc_hdr *lh = kuc_ptr(p);
2076 OBD_FREE(lh, kuc_len(payload_len));
2078 EXPORT_SYMBOL(kuc_free);
2080 struct obd_request_slot_waiter {
2081 struct list_head orsw_entry;
2082 wait_queue_head_t orsw_waitq;
2086 static bool obd_request_slot_avail(struct client_obd *cli,
2087 struct obd_request_slot_waiter *orsw)
2091 spin_lock(&cli->cl_loi_list_lock);
2092 avail = !!list_empty(&orsw->orsw_entry);
2093 spin_unlock(&cli->cl_loi_list_lock);
2099 * For network flow control, the RPC sponsor needs to acquire a credit
2100 * before sending the RPC. The credits count for a connection is defined
2101 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2102 * the subsequent RPC sponsors need to wait until others released their
2103 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2105 int obd_get_request_slot(struct client_obd *cli)
2107 struct obd_request_slot_waiter orsw;
2108 struct l_wait_info lwi;
2111 spin_lock(&cli->cl_loi_list_lock);
2112 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2113 cli->cl_rpcs_in_flight++;
2114 spin_unlock(&cli->cl_loi_list_lock);
2118 init_waitqueue_head(&orsw.orsw_waitq);
2119 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2120 orsw.orsw_signaled = false;
2121 spin_unlock(&cli->cl_loi_list_lock);
2123 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2124 rc = l_wait_event(orsw.orsw_waitq,
2125 obd_request_slot_avail(cli, &orsw) ||
2129 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2130 * freed but other (such as obd_put_request_slot) is using it. */
2131 spin_lock(&cli->cl_loi_list_lock);
2133 if (!orsw.orsw_signaled) {
2134 if (list_empty(&orsw.orsw_entry))
2135 cli->cl_rpcs_in_flight--;
2137 list_del(&orsw.orsw_entry);
2141 if (orsw.orsw_signaled) {
2142 LASSERT(list_empty(&orsw.orsw_entry));
2146 spin_unlock(&cli->cl_loi_list_lock);
2150 EXPORT_SYMBOL(obd_get_request_slot);
2152 void obd_put_request_slot(struct client_obd *cli)
2154 struct obd_request_slot_waiter *orsw;
2156 spin_lock(&cli->cl_loi_list_lock);
2157 cli->cl_rpcs_in_flight--;
2159 /* If there is free slot, wakeup the first waiter. */
2160 if (!list_empty(&cli->cl_flight_waiters) &&
2161 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2162 orsw = list_entry(cli->cl_flight_waiters.next,
2163 struct obd_request_slot_waiter, orsw_entry);
2164 list_del_init(&orsw->orsw_entry);
2165 cli->cl_rpcs_in_flight++;
2166 wake_up(&orsw->orsw_waitq);
2168 spin_unlock(&cli->cl_loi_list_lock);
2170 EXPORT_SYMBOL(obd_put_request_slot);
2172 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2174 return cli->cl_max_rpcs_in_flight;
2176 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2178 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2180 struct obd_request_slot_waiter *orsw;
2187 if (max > OBD_MAX_RIF_MAX || max < 1)
2190 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2191 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2192 /* adjust max_mod_rpcs_in_flight to ensure it is always
2193 * strictly lower that max_rpcs_in_flight */
2195 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2196 "because it must be higher than "
2197 "max_mod_rpcs_in_flight value",
2198 cli->cl_import->imp_obd->obd_name);
2201 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2202 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2208 spin_lock(&cli->cl_loi_list_lock);
2209 old = cli->cl_max_rpcs_in_flight;
2210 cli->cl_max_rpcs_in_flight = max;
2211 client_adjust_max_dirty(cli);
2215 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2216 for (i = 0; i < diff; i++) {
2217 if (list_empty(&cli->cl_flight_waiters))
2220 orsw = list_entry(cli->cl_flight_waiters.next,
2221 struct obd_request_slot_waiter, orsw_entry);
2222 list_del_init(&orsw->orsw_entry);
2223 cli->cl_rpcs_in_flight++;
2224 wake_up(&orsw->orsw_waitq);
2226 spin_unlock(&cli->cl_loi_list_lock);
2230 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2232 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2234 return cli->cl_max_mod_rpcs_in_flight;
2236 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2238 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2240 struct obd_connect_data *ocd;
2244 if (max > OBD_MAX_RIF_MAX || max < 1)
2247 /* cannot exceed or equal max_rpcs_in_flight */
2248 if (max >= cli->cl_max_rpcs_in_flight) {
2249 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2250 "higher or equal to max_rpcs_in_flight value (%u)\n",
2251 cli->cl_import->imp_obd->obd_name,
2252 max, cli->cl_max_rpcs_in_flight);
2256 /* cannot exceed max modify RPCs in flight supported by the server */
2257 ocd = &cli->cl_import->imp_connect_data;
2258 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2259 maxmodrpcs = ocd->ocd_maxmodrpcs;
2262 if (max > maxmodrpcs) {
2263 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2264 "higher than max_mod_rpcs_per_client value (%hu) "
2265 "returned by the server at connection\n",
2266 cli->cl_import->imp_obd->obd_name,
2271 spin_lock(&cli->cl_mod_rpcs_lock);
2273 prev = cli->cl_max_mod_rpcs_in_flight;
2274 cli->cl_max_mod_rpcs_in_flight = max;
2276 /* wakeup waiters if limit has been increased */
2277 if (cli->cl_max_mod_rpcs_in_flight > prev)
2278 wake_up(&cli->cl_mod_rpcs_waitq);
2280 spin_unlock(&cli->cl_mod_rpcs_lock);
2284 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2287 #define pct(a, b) (b ? a * 100 / b : 0)
2288 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2289 struct seq_file *seq)
2291 unsigned long mod_tot = 0, mod_cum;
2292 struct timespec64 now;
2295 ktime_get_real_ts64(&now);
2297 spin_lock(&cli->cl_mod_rpcs_lock);
2299 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2300 (s64)now.tv_sec, now.tv_nsec);
2301 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2302 cli->cl_mod_rpcs_in_flight);
2304 seq_printf(seq, "\n\t\t\tmodify\n");
2305 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2307 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2310 for (i = 0; i < OBD_HIST_MAX; i++) {
2311 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2313 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2314 i, mod, pct(mod, mod_tot),
2315 pct(mod_cum, mod_tot));
2316 if (mod_cum == mod_tot)
2320 spin_unlock(&cli->cl_mod_rpcs_lock);
2324 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2328 /* The number of modify RPCs sent in parallel is limited
2329 * because the server has a finite number of slots per client to
2330 * store request result and ensure reply reconstruction when needed.
2331 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2332 * that takes into account server limit and cl_max_rpcs_in_flight
2334 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2335 * one close request is allowed above the maximum.
2337 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2342 /* A slot is available if
2343 * - number of modify RPCs in flight is less than the max
2344 * - it's a close RPC and no other close request is in flight
2346 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2347 (close_req && cli->cl_close_rpcs_in_flight == 0);
2352 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2357 spin_lock(&cli->cl_mod_rpcs_lock);
2358 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2359 spin_unlock(&cli->cl_mod_rpcs_lock);
2363 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2366 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2367 it->it_op == IT_READDIR ||
2368 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2373 /* Get a modify RPC slot from the obd client @cli according
2374 * to the kind of operation @opc that is going to be sent
2375 * and the intent @it of the operation if it applies.
2376 * If the maximum number of modify RPCs in flight is reached
2377 * the thread is put to sleep.
2378 * Returns the tag to be set in the request message. Tag 0
2379 * is reserved for non-modifying requests.
2381 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2382 struct lookup_intent *it)
2384 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2385 bool close_req = false;
2388 /* read-only metadata RPCs don't consume a slot on MDT
2389 * for reply reconstruction
2391 if (obd_skip_mod_rpc_slot(it))
2394 if (opc == MDS_CLOSE)
2398 spin_lock(&cli->cl_mod_rpcs_lock);
2399 max = cli->cl_max_mod_rpcs_in_flight;
2400 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2401 /* there is a slot available */
2402 cli->cl_mod_rpcs_in_flight++;
2404 cli->cl_close_rpcs_in_flight++;
2405 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2406 cli->cl_mod_rpcs_in_flight);
2407 /* find a free tag */
2408 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2410 LASSERT(i < OBD_MAX_RIF_MAX);
2411 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2412 spin_unlock(&cli->cl_mod_rpcs_lock);
2413 /* tag 0 is reserved for non-modify RPCs */
2416 spin_unlock(&cli->cl_mod_rpcs_lock);
2418 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2419 "opc %u, max %hu\n",
2420 cli->cl_import->imp_obd->obd_name, opc, max);
2422 l_wait_event(cli->cl_mod_rpcs_waitq,
2423 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2426 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2428 /* Put a modify RPC slot from the obd client @cli according
2429 * to the kind of operation @opc that has been sent and the
2430 * intent @it of the operation if it applies.
2432 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2433 struct lookup_intent *it, __u16 tag)
2435 bool close_req = false;
2437 if (obd_skip_mod_rpc_slot(it))
2440 if (opc == MDS_CLOSE)
2443 spin_lock(&cli->cl_mod_rpcs_lock);
2444 cli->cl_mod_rpcs_in_flight--;
2446 cli->cl_close_rpcs_in_flight--;
2447 /* release the tag in the bitmap */
2448 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2449 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2450 spin_unlock(&cli->cl_mod_rpcs_lock);
2451 wake_up(&cli->cl_mod_rpcs_waitq);
2453 EXPORT_SYMBOL(obd_put_mod_rpc_slot);