4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
48 static DEFINE_SPINLOCK(obd_types_lock);
49 static LIST_HEAD(obd_types);
50 DEFINE_RWLOCK(obd_dev_lock);
51 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
53 static struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 static struct kmem_cache *import_cachep;
58 static LIST_HEAD(obd_zombie_imports);
59 static LIST_HEAD(obd_zombie_exports);
60 static DEFINE_SPINLOCK(obd_zombie_impexp_lock);
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp,
66 const char *status, int locks, int debug_level);
68 static LIST_HEAD(obd_stale_exports);
69 static DEFINE_SPINLOCK(obd_stale_export_lock);
70 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
72 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
73 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
76 * support functions: we could use inter-module communication, but this
77 * is more portable to other OS's
79 static struct obd_device *obd_device_alloc(void)
81 struct obd_device *obd;
83 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
85 obd->obd_magic = OBD_DEVICE_MAGIC;
90 static void obd_device_free(struct obd_device *obd)
93 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
94 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
95 if (obd->obd_namespace != NULL) {
96 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
97 obd, obd->obd_namespace, obd->obd_force);
100 lu_ref_fini(&obd->obd_reference);
101 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
104 struct obd_type *class_search_type(const char *name)
106 struct list_head *tmp;
107 struct obd_type *type;
109 spin_lock(&obd_types_lock);
110 list_for_each(tmp, &obd_types) {
111 type = list_entry(tmp, struct obd_type, typ_chain);
112 if (strcmp(type->typ_name, name) == 0) {
113 spin_unlock(&obd_types_lock);
117 spin_unlock(&obd_types_lock);
120 EXPORT_SYMBOL(class_search_type);
122 struct obd_type *class_get_type(const char *name)
124 struct obd_type *type = class_search_type(name);
126 #ifdef HAVE_MODULE_LOADING_SUPPORT
128 const char *modname = name;
130 if (strcmp(modname, "obdfilter") == 0)
133 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
134 modname = LUSTRE_OSP_NAME;
136 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
137 modname = LUSTRE_MDT_NAME;
139 if (!request_module("%s", modname)) {
140 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
141 type = class_search_type(name);
143 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
149 spin_lock(&type->obd_type_lock);
151 try_module_get(type->typ_dt_ops->o_owner);
152 spin_unlock(&type->obd_type_lock);
157 void class_put_type(struct obd_type *type)
160 spin_lock(&type->obd_type_lock);
162 module_put(type->typ_dt_ops->o_owner);
163 spin_unlock(&type->obd_type_lock);
166 static void class_sysfs_release(struct kobject *kobj)
168 OBD_FREE(kobj, sizeof(*kobj));
171 static struct kobj_type class_ktype = {
172 .sysfs_ops = &lustre_sysfs_ops,
173 .release = class_sysfs_release,
176 struct kobject *class_setup_tunables(const char *name)
178 struct kobject *kobj;
181 #ifdef HAVE_SERVER_SUPPORT
182 kobj = kset_find_obj(lustre_kset, name);
186 OBD_ALLOC(kobj, sizeof(*kobj));
188 return ERR_PTR(-ENOMEM);
190 kobj->kset = lustre_kset;
191 kobject_init(kobj, &class_ktype);
192 rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
199 EXPORT_SYMBOL(class_setup_tunables);
201 #define CLASS_MAX_NAME 1024
203 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
204 bool enable_proc, struct lprocfs_vars *vars,
205 const char *name, struct lu_device_type *ldt)
207 struct obd_type *type;
208 #ifdef HAVE_SERVER_SUPPORT
210 #endif /* HAVE_SERVER_SUPPORT */
215 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
217 if (class_search_type(name)) {
218 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
223 OBD_ALLOC(type, sizeof(*type));
227 OBD_ALLOC_PTR(type->typ_dt_ops);
228 OBD_ALLOC_PTR(type->typ_md_ops);
229 OBD_ALLOC(type->typ_name, strlen(name) + 1);
231 if (type->typ_dt_ops == NULL ||
232 type->typ_md_ops == NULL ||
233 type->typ_name == NULL)
236 *(type->typ_dt_ops) = *dt_ops;
237 /* md_ops is optional */
239 *(type->typ_md_ops) = *md_ops;
240 strcpy(type->typ_name, name);
241 spin_lock_init(&type->obd_type_lock);
243 #ifdef CONFIG_PROC_FS
245 type->typ_procroot = lprocfs_register(type->typ_name,
248 if (IS_ERR(type->typ_procroot)) {
249 rc = PTR_ERR(type->typ_procroot);
250 type->typ_procroot = NULL;
255 #ifdef HAVE_SERVER_SUPPORT
257 dname.len = strlen(dname.name);
258 dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
260 type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
261 if (type->typ_debugfs_entry) {
262 dput(type->typ_debugfs_entry);
263 type->typ_sym_filter = true;
266 #endif /* HAVE_SERVER_SUPPORT */
268 type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
271 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
272 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
274 type->typ_debugfs_entry = NULL;
277 #ifdef HAVE_SERVER_SUPPORT
280 type->typ_kobj = class_setup_tunables(type->typ_name);
281 if (IS_ERR(type->typ_kobj))
282 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
286 rc = lu_device_type_init(ldt);
288 kobject_put(type->typ_kobj);
293 spin_lock(&obd_types_lock);
294 list_add(&type->typ_chain, &obd_types);
295 spin_unlock(&obd_types_lock);
300 #ifdef HAVE_SERVER_SUPPORT
301 if (type->typ_sym_filter)
302 type->typ_debugfs_entry = NULL;
304 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
305 ldebugfs_remove(&type->typ_debugfs_entry);
306 if (type->typ_name != NULL) {
307 #ifdef CONFIG_PROC_FS
308 if (type->typ_procroot != NULL)
309 remove_proc_subtree(type->typ_name, proc_lustre_root);
311 OBD_FREE(type->typ_name, strlen(name) + 1);
313 if (type->typ_md_ops != NULL)
314 OBD_FREE_PTR(type->typ_md_ops);
315 if (type->typ_dt_ops != NULL)
316 OBD_FREE_PTR(type->typ_dt_ops);
317 OBD_FREE(type, sizeof(*type));
320 EXPORT_SYMBOL(class_register_type);
322 int class_unregister_type(const char *name)
324 struct obd_type *type = class_search_type(name);
328 CERROR("unknown obd type\n");
332 if (type->typ_refcnt) {
333 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
334 /* This is a bad situation, let's make the best of it */
335 /* Remove ops, but leave the name for debugging */
336 OBD_FREE_PTR(type->typ_dt_ops);
337 OBD_FREE_PTR(type->typ_md_ops);
341 kobject_put(type->typ_kobj);
343 /* we do not use type->typ_procroot as for compatibility purposes
344 * other modules can share names (i.e. lod can use lov entry). so
345 * we can't reference pointer as it can get invalided when another
346 * module removes the entry */
347 #ifdef CONFIG_PROC_FS
348 if (type->typ_procroot != NULL)
349 remove_proc_subtree(type->typ_name, proc_lustre_root);
350 if (type->typ_procsym != NULL)
351 lprocfs_remove(&type->typ_procsym);
353 #ifdef HAVE_SERVER_SUPPORT
354 if (type->typ_sym_filter)
355 type->typ_debugfs_entry = NULL;
357 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
358 ldebugfs_remove(&type->typ_debugfs_entry);
361 lu_device_type_fini(type->typ_lu);
363 spin_lock(&obd_types_lock);
364 list_del(&type->typ_chain);
365 spin_unlock(&obd_types_lock);
366 OBD_FREE(type->typ_name, strlen(name) + 1);
367 if (type->typ_dt_ops != NULL)
368 OBD_FREE_PTR(type->typ_dt_ops);
369 if (type->typ_md_ops != NULL)
370 OBD_FREE_PTR(type->typ_md_ops);
371 OBD_FREE(type, sizeof(*type));
373 } /* class_unregister_type */
374 EXPORT_SYMBOL(class_unregister_type);
377 * Create a new obd device.
379 * Allocate the new obd_device and initialize it.
381 * \param[in] type_name obd device type string.
382 * \param[in] name obd device name.
383 * \param[in] uuid obd device UUID
385 * \retval newdev pointer to created obd_device
386 * \retval ERR_PTR(errno) on error
388 struct obd_device *class_newdev(const char *type_name, const char *name,
391 struct obd_device *newdev;
392 struct obd_type *type = NULL;
395 if (strlen(name) >= MAX_OBD_NAME) {
396 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
397 RETURN(ERR_PTR(-EINVAL));
400 type = class_get_type(type_name);
402 CERROR("OBD: unknown type: %s\n", type_name);
403 RETURN(ERR_PTR(-ENODEV));
406 newdev = obd_device_alloc();
407 if (newdev == NULL) {
408 class_put_type(type);
409 RETURN(ERR_PTR(-ENOMEM));
411 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
412 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
413 newdev->obd_type = type;
414 newdev->obd_minor = -1;
416 rwlock_init(&newdev->obd_pool_lock);
417 newdev->obd_pool_limit = 0;
418 newdev->obd_pool_slv = 0;
420 INIT_LIST_HEAD(&newdev->obd_exports);
421 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
422 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
423 INIT_LIST_HEAD(&newdev->obd_exports_timed);
424 INIT_LIST_HEAD(&newdev->obd_nid_stats);
425 spin_lock_init(&newdev->obd_nid_lock);
426 spin_lock_init(&newdev->obd_dev_lock);
427 mutex_init(&newdev->obd_dev_mutex);
428 spin_lock_init(&newdev->obd_osfs_lock);
429 /* newdev->obd_osfs_age must be set to a value in the distant
430 * past to guarantee a fresh statfs is fetched on mount. */
431 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
433 /* XXX belongs in setup not attach */
434 init_rwsem(&newdev->obd_observer_link_sem);
436 init_timer(&newdev->obd_recovery_timer);
437 spin_lock_init(&newdev->obd_recovery_task_lock);
438 init_waitqueue_head(&newdev->obd_next_transno_waitq);
439 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
440 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
441 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
442 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
443 INIT_LIST_HEAD(&newdev->obd_evict_list);
444 INIT_LIST_HEAD(&newdev->obd_lwp_list);
446 llog_group_init(&newdev->obd_olg);
447 /* Detach drops this */
448 atomic_set(&newdev->obd_refcount, 1);
449 lu_ref_init(&newdev->obd_reference);
450 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
452 newdev->obd_conn_inprogress = 0;
454 strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
456 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
457 newdev->obd_name, newdev);
465 * \param[in] obd obd_device to be freed
469 void class_free_dev(struct obd_device *obd)
471 struct obd_type *obd_type = obd->obd_type;
473 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
474 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
475 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
476 "obd %p != obd_devs[%d] %p\n",
477 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
478 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
479 "obd_refcount should be 0, not %d\n",
480 atomic_read(&obd->obd_refcount));
481 LASSERT(obd_type != NULL);
483 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
484 obd->obd_name, obd->obd_type->typ_name);
486 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
487 obd->obd_name, obd->obd_uuid.uuid);
488 if (obd->obd_stopping) {
491 /* If we're not stopping, we were never set up */
492 err = obd_cleanup(obd);
494 CERROR("Cleanup %s returned %d\n",
498 obd_device_free(obd);
500 class_put_type(obd_type);
504 * Unregister obd device.
506 * Free slot in obd_dev[] used by \a obd.
508 * \param[in] new_obd obd_device to be unregistered
512 void class_unregister_device(struct obd_device *obd)
514 write_lock(&obd_dev_lock);
515 if (obd->obd_minor >= 0) {
516 LASSERT(obd_devs[obd->obd_minor] == obd);
517 obd_devs[obd->obd_minor] = NULL;
520 write_unlock(&obd_dev_lock);
524 * Register obd device.
526 * Find free slot in obd_devs[], fills it with \a new_obd.
528 * \param[in] new_obd obd_device to be registered
531 * \retval -EEXIST device with this name is registered
532 * \retval -EOVERFLOW obd_devs[] is full
534 int class_register_device(struct obd_device *new_obd)
538 int new_obd_minor = 0;
539 bool minor_assign = false;
540 bool retried = false;
543 write_lock(&obd_dev_lock);
544 for (i = 0; i < class_devno_max(); i++) {
545 struct obd_device *obd = class_num2obd(i);
548 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
551 write_unlock(&obd_dev_lock);
553 /* the obd_device could be waited to be
554 * destroyed by the "obd_zombie_impexp_thread".
556 obd_zombie_barrier();
561 CERROR("%s: already exists, won't add\n",
563 /* in case we found a free slot before duplicate */
564 minor_assign = false;
568 if (!minor_assign && obd == NULL) {
575 new_obd->obd_minor = new_obd_minor;
576 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
577 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
578 obd_devs[new_obd_minor] = new_obd;
582 CERROR("%s: all %u/%u devices used, increase "
583 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
584 i, class_devno_max(), ret);
587 write_unlock(&obd_dev_lock);
592 static int class_name2dev_nolock(const char *name)
599 for (i = 0; i < class_devno_max(); i++) {
600 struct obd_device *obd = class_num2obd(i);
602 if (obd && strcmp(name, obd->obd_name) == 0) {
603 /* Make sure we finished attaching before we give
604 out any references */
605 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
606 if (obd->obd_attached) {
616 int class_name2dev(const char *name)
623 read_lock(&obd_dev_lock);
624 i = class_name2dev_nolock(name);
625 read_unlock(&obd_dev_lock);
629 EXPORT_SYMBOL(class_name2dev);
631 struct obd_device *class_name2obd(const char *name)
633 int dev = class_name2dev(name);
635 if (dev < 0 || dev > class_devno_max())
637 return class_num2obd(dev);
639 EXPORT_SYMBOL(class_name2obd);
641 int class_uuid2dev_nolock(struct obd_uuid *uuid)
645 for (i = 0; i < class_devno_max(); i++) {
646 struct obd_device *obd = class_num2obd(i);
648 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
649 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
657 int class_uuid2dev(struct obd_uuid *uuid)
661 read_lock(&obd_dev_lock);
662 i = class_uuid2dev_nolock(uuid);
663 read_unlock(&obd_dev_lock);
667 EXPORT_SYMBOL(class_uuid2dev);
669 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
671 int dev = class_uuid2dev(uuid);
674 return class_num2obd(dev);
676 EXPORT_SYMBOL(class_uuid2obd);
679 * Get obd device from ::obd_devs[]
681 * \param num [in] array index
683 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
684 * otherwise return the obd device there.
686 struct obd_device *class_num2obd(int num)
688 struct obd_device *obd = NULL;
690 if (num < class_devno_max()) {
695 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
696 "%p obd_magic %08x != %08x\n",
697 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
698 LASSERTF(obd->obd_minor == num,
699 "%p obd_minor %0d != %0d\n",
700 obd, obd->obd_minor, num);
707 * Find obd in obd_dev[] by name or uuid.
709 * Increment obd's refcount if found.
711 * \param[in] str obd name or uuid
713 * \retval NULL if not found
714 * \retval target pointer to found obd_device
716 struct obd_device *class_dev_by_str(const char *str)
718 struct obd_device *target = NULL;
719 struct obd_uuid tgtuuid;
722 obd_str2uuid(&tgtuuid, str);
724 read_lock(&obd_dev_lock);
725 rc = class_uuid2dev_nolock(&tgtuuid);
727 rc = class_name2dev_nolock(str);
730 target = class_num2obd(rc);
733 class_incref(target, "find", current);
734 read_unlock(&obd_dev_lock);
738 EXPORT_SYMBOL(class_dev_by_str);
741 * Get obd devices count. Device in any
743 * \retval obd device count
745 int get_devices_count(void)
747 int index, max_index = class_devno_max(), dev_count = 0;
749 read_lock(&obd_dev_lock);
750 for (index = 0; index <= max_index; index++) {
751 struct obd_device *obd = class_num2obd(index);
755 read_unlock(&obd_dev_lock);
759 EXPORT_SYMBOL(get_devices_count);
761 void class_obd_list(void)
766 read_lock(&obd_dev_lock);
767 for (i = 0; i < class_devno_max(); i++) {
768 struct obd_device *obd = class_num2obd(i);
772 if (obd->obd_stopping)
774 else if (obd->obd_set_up)
776 else if (obd->obd_attached)
780 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
781 i, status, obd->obd_type->typ_name,
782 obd->obd_name, obd->obd_uuid.uuid,
783 atomic_read(&obd->obd_refcount));
785 read_unlock(&obd_dev_lock);
789 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
790 specified, then only the client with that uuid is returned,
791 otherwise any client connected to the tgt is returned. */
792 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
793 const char * typ_name,
794 struct obd_uuid *grp_uuid)
798 read_lock(&obd_dev_lock);
799 for (i = 0; i < class_devno_max(); i++) {
800 struct obd_device *obd = class_num2obd(i);
804 if ((strncmp(obd->obd_type->typ_name, typ_name,
805 strlen(typ_name)) == 0)) {
806 if (obd_uuid_equals(tgt_uuid,
807 &obd->u.cli.cl_target_uuid) &&
808 ((grp_uuid)? obd_uuid_equals(grp_uuid,
809 &obd->obd_uuid) : 1)) {
810 read_unlock(&obd_dev_lock);
815 read_unlock(&obd_dev_lock);
819 EXPORT_SYMBOL(class_find_client_obd);
821 /* Iterate the obd_device list looking devices have grp_uuid. Start
822 searching at *next, and if a device is found, the next index to look
823 at is saved in *next. If next is NULL, then the first matching device
824 will always be returned. */
825 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
831 else if (*next >= 0 && *next < class_devno_max())
836 read_lock(&obd_dev_lock);
837 for (; i < class_devno_max(); i++) {
838 struct obd_device *obd = class_num2obd(i);
842 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
845 read_unlock(&obd_dev_lock);
849 read_unlock(&obd_dev_lock);
853 EXPORT_SYMBOL(class_devices_in_group);
856 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
857 * adjust sptlrpc settings accordingly.
859 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
861 struct obd_device *obd;
865 LASSERT(namelen > 0);
867 read_lock(&obd_dev_lock);
868 for (i = 0; i < class_devno_max(); i++) {
869 obd = class_num2obd(i);
871 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
874 /* only notify mdc, osc, osp, lwp, mdt, ost
875 * because only these have a -sptlrpc llog */
876 type = obd->obd_type->typ_name;
877 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
878 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
879 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
880 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
881 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
882 strcmp(type, LUSTRE_OST_NAME) != 0)
885 if (strncmp(obd->obd_name, fsname, namelen))
888 class_incref(obd, __FUNCTION__, obd);
889 read_unlock(&obd_dev_lock);
890 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
891 sizeof(KEY_SPTLRPC_CONF),
892 KEY_SPTLRPC_CONF, 0, NULL, NULL);
894 class_decref(obd, __FUNCTION__, obd);
895 read_lock(&obd_dev_lock);
897 read_unlock(&obd_dev_lock);
900 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
902 void obd_cleanup_caches(void)
905 if (obd_device_cachep) {
906 kmem_cache_destroy(obd_device_cachep);
907 obd_device_cachep = NULL;
910 kmem_cache_destroy(obdo_cachep);
914 kmem_cache_destroy(import_cachep);
915 import_cachep = NULL;
921 int obd_init_caches(void)
926 LASSERT(obd_device_cachep == NULL);
927 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
928 sizeof(struct obd_device),
930 if (!obd_device_cachep)
931 GOTO(out, rc = -ENOMEM);
933 LASSERT(obdo_cachep == NULL);
934 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
937 GOTO(out, rc = -ENOMEM);
939 LASSERT(import_cachep == NULL);
940 import_cachep = kmem_cache_create("ll_import_cache",
941 sizeof(struct obd_import),
944 GOTO(out, rc = -ENOMEM);
948 obd_cleanup_caches();
952 /* map connection to client */
953 struct obd_export *class_conn2export(struct lustre_handle *conn)
955 struct obd_export *export;
959 CDEBUG(D_CACHE, "looking for null handle\n");
963 if (conn->cookie == -1) { /* this means assign a new connection */
964 CDEBUG(D_CACHE, "want a new connection\n");
968 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
969 export = class_handle2object(conn->cookie, NULL);
972 EXPORT_SYMBOL(class_conn2export);
974 struct obd_device *class_exp2obd(struct obd_export *exp)
980 EXPORT_SYMBOL(class_exp2obd);
982 struct obd_device *class_conn2obd(struct lustre_handle *conn)
984 struct obd_export *export;
985 export = class_conn2export(conn);
987 struct obd_device *obd = export->exp_obd;
988 class_export_put(export);
994 struct obd_import *class_exp2cliimp(struct obd_export *exp)
996 struct obd_device *obd = exp->exp_obd;
999 return obd->u.cli.cl_import;
1001 EXPORT_SYMBOL(class_exp2cliimp);
1003 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
1005 struct obd_device *obd = class_conn2obd(conn);
1008 return obd->u.cli.cl_import;
1011 /* Export management functions */
1012 static void class_export_destroy(struct obd_export *exp)
1014 struct obd_device *obd = exp->exp_obd;
1017 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
1018 LASSERT(obd != NULL);
1020 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
1021 exp->exp_client_uuid.uuid, obd->obd_name);
1023 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
1024 if (exp->exp_connection)
1025 ptlrpc_put_connection_superhack(exp->exp_connection);
1027 LASSERT(list_empty(&exp->exp_outstanding_replies));
1028 LASSERT(list_empty(&exp->exp_uncommitted_replies));
1029 LASSERT(list_empty(&exp->exp_req_replay_queue));
1030 LASSERT(list_empty(&exp->exp_hp_rpcs));
1031 obd_destroy_export(exp);
1032 /* self export doesn't hold a reference to an obd, although it
1033 * exists until freeing of the obd */
1034 if (exp != obd->obd_self_export)
1035 class_decref(obd, "export", exp);
1037 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1041 static void export_handle_addref(void *export)
1043 class_export_get(export);
1046 static struct portals_handle_ops export_handle_ops = {
1047 .hop_addref = export_handle_addref,
1051 struct obd_export *class_export_get(struct obd_export *exp)
1053 atomic_inc(&exp->exp_refcount);
1054 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1055 atomic_read(&exp->exp_refcount));
1058 EXPORT_SYMBOL(class_export_get);
1060 void class_export_put(struct obd_export *exp)
1062 LASSERT(exp != NULL);
1063 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1064 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1065 atomic_read(&exp->exp_refcount) - 1);
1067 if (atomic_dec_and_test(&exp->exp_refcount)) {
1068 struct obd_device *obd = exp->exp_obd;
1070 CDEBUG(D_IOCTL, "final put %p/%s\n",
1071 exp, exp->exp_client_uuid.uuid);
1073 /* release nid stat refererence */
1074 lprocfs_exp_cleanup(exp);
1076 if (exp == obd->obd_self_export) {
1077 /* self export should be destroyed without
1078 * zombie thread as it doesn't hold a
1079 * reference to obd and doesn't hold any
1081 class_export_destroy(exp);
1082 /* self export is destroyed, no class
1083 * references exist and it is safe to free
1085 class_free_dev(obd);
1087 LASSERT(!list_empty(&exp->exp_obd_chain));
1088 obd_zombie_export_add(exp);
1093 EXPORT_SYMBOL(class_export_put);
1094 /* Creates a new export, adds it to the hash table, and returns a
1095 * pointer to it. The refcount is 2: one for the hash reference, and
1096 * one for the pointer returned by this function. */
1097 struct obd_export *__class_new_export(struct obd_device *obd,
1098 struct obd_uuid *cluuid, bool is_self)
1100 struct obd_export *export;
1101 struct cfs_hash *hash = NULL;
1105 OBD_ALLOC_PTR(export);
1107 return ERR_PTR(-ENOMEM);
1109 export->exp_conn_cnt = 0;
1110 export->exp_lock_hash = NULL;
1111 export->exp_flock_hash = NULL;
1112 /* 2 = class_handle_hash + last */
1113 atomic_set(&export->exp_refcount, 2);
1114 atomic_set(&export->exp_rpc_count, 0);
1115 atomic_set(&export->exp_cb_count, 0);
1116 atomic_set(&export->exp_locks_count, 0);
1117 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1118 INIT_LIST_HEAD(&export->exp_locks_list);
1119 spin_lock_init(&export->exp_locks_list_guard);
1121 atomic_set(&export->exp_replay_count, 0);
1122 export->exp_obd = obd;
1123 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1124 spin_lock_init(&export->exp_uncommitted_replies_lock);
1125 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1126 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1127 INIT_LIST_HEAD(&export->exp_handle.h_link);
1128 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1129 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1130 class_handle_hash(&export->exp_handle, &export_handle_ops);
1131 export->exp_last_request_time = ktime_get_real_seconds();
1132 spin_lock_init(&export->exp_lock);
1133 spin_lock_init(&export->exp_rpc_lock);
1134 INIT_HLIST_NODE(&export->exp_uuid_hash);
1135 INIT_HLIST_NODE(&export->exp_nid_hash);
1136 INIT_HLIST_NODE(&export->exp_gen_hash);
1137 spin_lock_init(&export->exp_bl_list_lock);
1138 INIT_LIST_HEAD(&export->exp_bl_list);
1139 INIT_LIST_HEAD(&export->exp_stale_list);
1141 export->exp_sp_peer = LUSTRE_SP_ANY;
1142 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1143 export->exp_client_uuid = *cluuid;
1144 obd_init_export(export);
1146 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1147 spin_lock(&obd->obd_dev_lock);
1148 /* shouldn't happen, but might race */
1149 if (obd->obd_stopping)
1150 GOTO(exit_unlock, rc = -ENODEV);
1152 hash = cfs_hash_getref(obd->obd_uuid_hash);
1154 GOTO(exit_unlock, rc = -ENODEV);
1155 spin_unlock(&obd->obd_dev_lock);
1157 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1159 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1160 obd->obd_name, cluuid->uuid, rc);
1161 GOTO(exit_err, rc = -EALREADY);
1165 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1166 spin_lock(&obd->obd_dev_lock);
1167 if (obd->obd_stopping) {
1169 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1170 GOTO(exit_unlock, rc = -ESHUTDOWN);
1174 class_incref(obd, "export", export);
1175 list_add_tail(&export->exp_obd_chain_timed,
1176 &obd->obd_exports_timed);
1177 list_add(&export->exp_obd_chain, &obd->obd_exports);
1178 obd->obd_num_exports++;
1180 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1181 INIT_LIST_HEAD(&export->exp_obd_chain);
1183 spin_unlock(&obd->obd_dev_lock);
1185 cfs_hash_putref(hash);
1189 spin_unlock(&obd->obd_dev_lock);
1192 cfs_hash_putref(hash);
1193 class_handle_unhash(&export->exp_handle);
1194 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1195 obd_destroy_export(export);
1196 OBD_FREE_PTR(export);
1200 struct obd_export *class_new_export(struct obd_device *obd,
1201 struct obd_uuid *uuid)
1203 return __class_new_export(obd, uuid, false);
1205 EXPORT_SYMBOL(class_new_export);
1207 struct obd_export *class_new_export_self(struct obd_device *obd,
1208 struct obd_uuid *uuid)
1210 return __class_new_export(obd, uuid, true);
1213 void class_unlink_export(struct obd_export *exp)
1215 class_handle_unhash(&exp->exp_handle);
1217 if (exp->exp_obd->obd_self_export == exp) {
1218 class_export_put(exp);
1222 spin_lock(&exp->exp_obd->obd_dev_lock);
1223 /* delete an uuid-export hashitem from hashtables */
1224 if (!hlist_unhashed(&exp->exp_uuid_hash))
1225 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1226 &exp->exp_client_uuid,
1227 &exp->exp_uuid_hash);
1229 #ifdef HAVE_SERVER_SUPPORT
1230 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1231 struct tg_export_data *ted = &exp->exp_target_data;
1232 struct cfs_hash *hash;
1234 /* Because obd_gen_hash will not be released until
1235 * class_cleanup(), so hash should never be NULL here */
1236 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1237 LASSERT(hash != NULL);
1238 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1239 &exp->exp_gen_hash);
1240 cfs_hash_putref(hash);
1242 #endif /* HAVE_SERVER_SUPPORT */
1244 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1245 list_del_init(&exp->exp_obd_chain_timed);
1246 exp->exp_obd->obd_num_exports--;
1247 spin_unlock(&exp->exp_obd->obd_dev_lock);
1248 atomic_inc(&obd_stale_export_num);
1250 /* A reference is kept by obd_stale_exports list */
1251 obd_stale_export_put(exp);
1253 EXPORT_SYMBOL(class_unlink_export);
1255 /* Import management functions */
1256 static void class_import_destroy(struct obd_import *imp)
1260 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1261 imp->imp_obd->obd_name);
1263 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1265 ptlrpc_put_connection_superhack(imp->imp_connection);
1267 while (!list_empty(&imp->imp_conn_list)) {
1268 struct obd_import_conn *imp_conn;
1270 imp_conn = list_entry(imp->imp_conn_list.next,
1271 struct obd_import_conn, oic_item);
1272 list_del_init(&imp_conn->oic_item);
1273 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1274 OBD_FREE(imp_conn, sizeof(*imp_conn));
1277 LASSERT(imp->imp_sec == NULL);
1278 class_decref(imp->imp_obd, "import", imp);
1279 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1283 static void import_handle_addref(void *import)
1285 class_import_get(import);
1288 static struct portals_handle_ops import_handle_ops = {
1289 .hop_addref = import_handle_addref,
1293 struct obd_import *class_import_get(struct obd_import *import)
1295 atomic_inc(&import->imp_refcount);
1296 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1297 atomic_read(&import->imp_refcount),
1298 import->imp_obd->obd_name);
1301 EXPORT_SYMBOL(class_import_get);
1303 void class_import_put(struct obd_import *imp)
1307 LASSERT(list_empty(&imp->imp_zombie_chain));
1308 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1310 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1311 atomic_read(&imp->imp_refcount) - 1,
1312 imp->imp_obd->obd_name);
1314 if (atomic_dec_and_test(&imp->imp_refcount)) {
1315 CDEBUG(D_INFO, "final put import %p\n", imp);
1316 obd_zombie_import_add(imp);
1319 /* catch possible import put race */
1320 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1323 EXPORT_SYMBOL(class_import_put);
1325 static void init_imp_at(struct imp_at *at) {
1327 at_init(&at->iat_net_latency, 0, 0);
1328 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1329 /* max service estimates are tracked on the server side, so
1330 don't use the AT history here, just use the last reported
1331 val. (But keep hist for proc histogram, worst_ever) */
1332 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1337 struct obd_import *class_new_import(struct obd_device *obd)
1339 struct obd_import *imp;
1340 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1342 OBD_ALLOC(imp, sizeof(*imp));
1346 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1347 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1348 INIT_LIST_HEAD(&imp->imp_replay_list);
1349 INIT_LIST_HEAD(&imp->imp_sending_list);
1350 INIT_LIST_HEAD(&imp->imp_delayed_list);
1351 INIT_LIST_HEAD(&imp->imp_committed_list);
1352 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1353 imp->imp_known_replied_xid = 0;
1354 imp->imp_replay_cursor = &imp->imp_committed_list;
1355 spin_lock_init(&imp->imp_lock);
1356 imp->imp_last_success_conn = 0;
1357 imp->imp_state = LUSTRE_IMP_NEW;
1358 imp->imp_obd = class_incref(obd, "import", imp);
1359 mutex_init(&imp->imp_sec_mutex);
1360 init_waitqueue_head(&imp->imp_recovery_waitq);
1362 if (curr_pid_ns->child_reaper)
1363 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1365 imp->imp_sec_refpid = 1;
1367 atomic_set(&imp->imp_refcount, 2);
1368 atomic_set(&imp->imp_unregistering, 0);
1369 atomic_set(&imp->imp_inflight, 0);
1370 atomic_set(&imp->imp_replay_inflight, 0);
1371 atomic_set(&imp->imp_inval_count, 0);
1372 INIT_LIST_HEAD(&imp->imp_conn_list);
1373 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1374 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1375 init_imp_at(&imp->imp_at);
1377 /* the default magic is V2, will be used in connect RPC, and
1378 * then adjusted according to the flags in request/reply. */
1379 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1383 EXPORT_SYMBOL(class_new_import);
1385 void class_destroy_import(struct obd_import *import)
1387 LASSERT(import != NULL);
1388 LASSERT(import != LP_POISON);
1390 class_handle_unhash(&import->imp_handle);
1392 spin_lock(&import->imp_lock);
1393 import->imp_generation++;
1394 spin_unlock(&import->imp_lock);
1395 class_import_put(import);
1397 EXPORT_SYMBOL(class_destroy_import);
1399 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1401 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1403 spin_lock(&exp->exp_locks_list_guard);
1405 LASSERT(lock->l_exp_refs_nr >= 0);
1407 if (lock->l_exp_refs_target != NULL &&
1408 lock->l_exp_refs_target != exp) {
1409 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1410 exp, lock, lock->l_exp_refs_target);
1412 if ((lock->l_exp_refs_nr ++) == 0) {
1413 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1414 lock->l_exp_refs_target = exp;
1416 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1417 lock, exp, lock->l_exp_refs_nr);
1418 spin_unlock(&exp->exp_locks_list_guard);
1420 EXPORT_SYMBOL(__class_export_add_lock_ref);
1422 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1424 spin_lock(&exp->exp_locks_list_guard);
1425 LASSERT(lock->l_exp_refs_nr > 0);
1426 if (lock->l_exp_refs_target != exp) {
1427 LCONSOLE_WARN("lock %p, "
1428 "mismatching export pointers: %p, %p\n",
1429 lock, lock->l_exp_refs_target, exp);
1431 if (-- lock->l_exp_refs_nr == 0) {
1432 list_del_init(&lock->l_exp_refs_link);
1433 lock->l_exp_refs_target = NULL;
1435 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1436 lock, exp, lock->l_exp_refs_nr);
1437 spin_unlock(&exp->exp_locks_list_guard);
1439 EXPORT_SYMBOL(__class_export_del_lock_ref);
1442 /* A connection defines an export context in which preallocation can
1443 be managed. This releases the export pointer reference, and returns
1444 the export handle, so the export refcount is 1 when this function
1446 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1447 struct obd_uuid *cluuid)
1449 struct obd_export *export;
1450 LASSERT(conn != NULL);
1451 LASSERT(obd != NULL);
1452 LASSERT(cluuid != NULL);
1455 export = class_new_export(obd, cluuid);
1457 RETURN(PTR_ERR(export));
1459 conn->cookie = export->exp_handle.h_cookie;
1460 class_export_put(export);
1462 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1463 cluuid->uuid, conn->cookie);
1466 EXPORT_SYMBOL(class_connect);
1468 /* if export is involved in recovery then clean up related things */
1469 static void class_export_recovery_cleanup(struct obd_export *exp)
1471 struct obd_device *obd = exp->exp_obd;
1473 spin_lock(&obd->obd_recovery_task_lock);
1474 if (obd->obd_recovering) {
1475 if (exp->exp_in_recovery) {
1476 spin_lock(&exp->exp_lock);
1477 exp->exp_in_recovery = 0;
1478 spin_unlock(&exp->exp_lock);
1479 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1480 atomic_dec(&obd->obd_connected_clients);
1483 /* if called during recovery then should update
1484 * obd_stale_clients counter,
1485 * lightweight exports are not counted */
1486 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1487 exp->exp_obd->obd_stale_clients++;
1489 spin_unlock(&obd->obd_recovery_task_lock);
1491 spin_lock(&exp->exp_lock);
1492 /** Cleanup req replay fields */
1493 if (exp->exp_req_replay_needed) {
1494 exp->exp_req_replay_needed = 0;
1496 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1497 atomic_dec(&obd->obd_req_replay_clients);
1500 /** Cleanup lock replay data */
1501 if (exp->exp_lock_replay_needed) {
1502 exp->exp_lock_replay_needed = 0;
1504 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1505 atomic_dec(&obd->obd_lock_replay_clients);
1507 spin_unlock(&exp->exp_lock);
1510 /* This function removes 1-3 references from the export:
1511 * 1 - for export pointer passed
1512 * and if disconnect really need
1513 * 2 - removing from hash
1514 * 3 - in client_unlink_export
1515 * The export pointer passed to this function can destroyed */
1516 int class_disconnect(struct obd_export *export)
1518 int already_disconnected;
1521 if (export == NULL) {
1522 CWARN("attempting to free NULL export %p\n", export);
1526 spin_lock(&export->exp_lock);
1527 already_disconnected = export->exp_disconnected;
1528 export->exp_disconnected = 1;
1529 /* We hold references of export for uuid hash
1530 * and nid_hash and export link at least. So
1531 * it is safe to call cfs_hash_del in there. */
1532 if (!hlist_unhashed(&export->exp_nid_hash))
1533 cfs_hash_del(export->exp_obd->obd_nid_hash,
1534 &export->exp_connection->c_peer.nid,
1535 &export->exp_nid_hash);
1536 spin_unlock(&export->exp_lock);
1538 /* class_cleanup(), abort_recovery(), and class_fail_export()
1539 * all end up in here, and if any of them race we shouldn't
1540 * call extra class_export_puts(). */
1541 if (already_disconnected) {
1542 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1543 GOTO(no_disconn, already_disconnected);
1546 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1547 export->exp_handle.h_cookie);
1549 class_export_recovery_cleanup(export);
1550 class_unlink_export(export);
1552 class_export_put(export);
1555 EXPORT_SYMBOL(class_disconnect);
1557 /* Return non-zero for a fully connected export */
1558 int class_connected_export(struct obd_export *exp)
1563 spin_lock(&exp->exp_lock);
1564 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1565 spin_unlock(&exp->exp_lock);
1569 EXPORT_SYMBOL(class_connected_export);
1571 static void class_disconnect_export_list(struct list_head *list,
1572 enum obd_option flags)
1575 struct obd_export *exp;
1578 /* It's possible that an export may disconnect itself, but
1579 * nothing else will be added to this list. */
1580 while (!list_empty(list)) {
1581 exp = list_entry(list->next, struct obd_export,
1583 /* need for safe call CDEBUG after obd_disconnect */
1584 class_export_get(exp);
1586 spin_lock(&exp->exp_lock);
1587 exp->exp_flags = flags;
1588 spin_unlock(&exp->exp_lock);
1590 if (obd_uuid_equals(&exp->exp_client_uuid,
1591 &exp->exp_obd->obd_uuid)) {
1593 "exp %p export uuid == obd uuid, don't discon\n",
1595 /* Need to delete this now so we don't end up pointing
1596 * to work_list later when this export is cleaned up. */
1597 list_del_init(&exp->exp_obd_chain);
1598 class_export_put(exp);
1602 class_export_get(exp);
1603 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1604 "last request at %lld\n",
1605 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1606 exp, exp->exp_last_request_time);
1607 /* release one export reference anyway */
1608 rc = obd_disconnect(exp);
1610 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1611 obd_export_nid2str(exp), exp, rc);
1612 class_export_put(exp);
1617 void class_disconnect_exports(struct obd_device *obd)
1619 struct list_head work_list;
1622 /* Move all of the exports from obd_exports to a work list, en masse. */
1623 INIT_LIST_HEAD(&work_list);
1624 spin_lock(&obd->obd_dev_lock);
1625 list_splice_init(&obd->obd_exports, &work_list);
1626 list_splice_init(&obd->obd_delayed_exports, &work_list);
1627 spin_unlock(&obd->obd_dev_lock);
1629 if (!list_empty(&work_list)) {
1630 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1631 "disconnecting them\n", obd->obd_minor, obd);
1632 class_disconnect_export_list(&work_list,
1633 exp_flags_from_obd(obd));
1635 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1636 obd->obd_minor, obd);
1639 EXPORT_SYMBOL(class_disconnect_exports);
1641 /* Remove exports that have not completed recovery.
1643 void class_disconnect_stale_exports(struct obd_device *obd,
1644 int (*test_export)(struct obd_export *))
1646 struct list_head work_list;
1647 struct obd_export *exp, *n;
1651 INIT_LIST_HEAD(&work_list);
1652 spin_lock(&obd->obd_dev_lock);
1653 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1655 /* don't count self-export as client */
1656 if (obd_uuid_equals(&exp->exp_client_uuid,
1657 &exp->exp_obd->obd_uuid))
1660 /* don't evict clients which have no slot in last_rcvd
1661 * (e.g. lightweight connection) */
1662 if (exp->exp_target_data.ted_lr_idx == -1)
1665 spin_lock(&exp->exp_lock);
1666 if (exp->exp_failed || test_export(exp)) {
1667 spin_unlock(&exp->exp_lock);
1670 exp->exp_failed = 1;
1671 spin_unlock(&exp->exp_lock);
1673 list_move(&exp->exp_obd_chain, &work_list);
1675 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1676 obd->obd_name, exp->exp_client_uuid.uuid,
1677 obd_export_nid2str(exp));
1678 print_export_data(exp, "EVICTING", 0, D_HA);
1680 spin_unlock(&obd->obd_dev_lock);
1683 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1684 obd->obd_name, evicted);
1686 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1687 OBD_OPT_ABORT_RECOV);
1690 EXPORT_SYMBOL(class_disconnect_stale_exports);
1692 void class_fail_export(struct obd_export *exp)
1694 int rc, already_failed;
1696 spin_lock(&exp->exp_lock);
1697 already_failed = exp->exp_failed;
1698 exp->exp_failed = 1;
1699 spin_unlock(&exp->exp_lock);
1701 if (already_failed) {
1702 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1703 exp, exp->exp_client_uuid.uuid);
1707 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1708 exp, exp->exp_client_uuid.uuid);
1710 if (obd_dump_on_timeout)
1711 libcfs_debug_dumplog();
1713 /* need for safe call CDEBUG after obd_disconnect */
1714 class_export_get(exp);
1716 /* Most callers into obd_disconnect are removing their own reference
1717 * (request, for example) in addition to the one from the hash table.
1718 * We don't have such a reference here, so make one. */
1719 class_export_get(exp);
1720 rc = obd_disconnect(exp);
1722 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1724 CDEBUG(D_HA, "disconnected export %p/%s\n",
1725 exp, exp->exp_client_uuid.uuid);
1726 class_export_put(exp);
1728 EXPORT_SYMBOL(class_fail_export);
1730 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1732 struct cfs_hash *nid_hash;
1733 struct obd_export *doomed_exp = NULL;
1734 int exports_evicted = 0;
1736 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1738 spin_lock(&obd->obd_dev_lock);
1739 /* umount has run already, so evict thread should leave
1740 * its task to umount thread now */
1741 if (obd->obd_stopping) {
1742 spin_unlock(&obd->obd_dev_lock);
1743 return exports_evicted;
1745 nid_hash = obd->obd_nid_hash;
1746 cfs_hash_getref(nid_hash);
1747 spin_unlock(&obd->obd_dev_lock);
1750 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1751 if (doomed_exp == NULL)
1754 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1755 "nid %s found, wanted nid %s, requested nid %s\n",
1756 obd_export_nid2str(doomed_exp),
1757 libcfs_nid2str(nid_key), nid);
1758 LASSERTF(doomed_exp != obd->obd_self_export,
1759 "self-export is hashed by NID?\n");
1761 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1762 "request\n", obd->obd_name,
1763 obd_uuid2str(&doomed_exp->exp_client_uuid),
1764 obd_export_nid2str(doomed_exp));
1765 class_fail_export(doomed_exp);
1766 class_export_put(doomed_exp);
1769 cfs_hash_putref(nid_hash);
1771 if (!exports_evicted)
1772 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1773 obd->obd_name, nid);
1774 return exports_evicted;
1776 EXPORT_SYMBOL(obd_export_evict_by_nid);
1778 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1780 struct cfs_hash *uuid_hash;
1781 struct obd_export *doomed_exp = NULL;
1782 struct obd_uuid doomed_uuid;
1783 int exports_evicted = 0;
1785 spin_lock(&obd->obd_dev_lock);
1786 if (obd->obd_stopping) {
1787 spin_unlock(&obd->obd_dev_lock);
1788 return exports_evicted;
1790 uuid_hash = obd->obd_uuid_hash;
1791 cfs_hash_getref(uuid_hash);
1792 spin_unlock(&obd->obd_dev_lock);
1794 obd_str2uuid(&doomed_uuid, uuid);
1795 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1796 CERROR("%s: can't evict myself\n", obd->obd_name);
1797 cfs_hash_putref(uuid_hash);
1798 return exports_evicted;
1801 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1803 if (doomed_exp == NULL) {
1804 CERROR("%s: can't disconnect %s: no exports found\n",
1805 obd->obd_name, uuid);
1807 CWARN("%s: evicting %s at adminstrative request\n",
1808 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1809 class_fail_export(doomed_exp);
1810 class_export_put(doomed_exp);
1813 cfs_hash_putref(uuid_hash);
1815 return exports_evicted;
1818 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1819 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1820 EXPORT_SYMBOL(class_export_dump_hook);
1823 static void print_export_data(struct obd_export *exp, const char *status,
1824 int locks, int debug_level)
1826 struct ptlrpc_reply_state *rs;
1827 struct ptlrpc_reply_state *first_reply = NULL;
1830 spin_lock(&exp->exp_lock);
1831 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1837 spin_unlock(&exp->exp_lock);
1839 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1840 "%p %s %llu stale:%d\n",
1841 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1842 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1843 atomic_read(&exp->exp_rpc_count),
1844 atomic_read(&exp->exp_cb_count),
1845 atomic_read(&exp->exp_locks_count),
1846 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1847 nreplies, first_reply, nreplies > 3 ? "..." : "",
1848 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1849 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1850 if (locks && class_export_dump_hook != NULL)
1851 class_export_dump_hook(exp);
1855 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1857 struct obd_export *exp;
1859 spin_lock(&obd->obd_dev_lock);
1860 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1861 print_export_data(exp, "ACTIVE", locks, debug_level);
1862 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1863 print_export_data(exp, "UNLINKED", locks, debug_level);
1864 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1865 print_export_data(exp, "DELAYED", locks, debug_level);
1866 spin_unlock(&obd->obd_dev_lock);
1867 spin_lock(&obd_zombie_impexp_lock);
1868 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1869 print_export_data(exp, "ZOMBIE", locks, debug_level);
1870 spin_unlock(&obd_zombie_impexp_lock);
1873 void obd_exports_barrier(struct obd_device *obd)
1876 LASSERT(list_empty(&obd->obd_exports));
1877 spin_lock(&obd->obd_dev_lock);
1878 while (!list_empty(&obd->obd_unlinked_exports)) {
1879 spin_unlock(&obd->obd_dev_lock);
1880 set_current_state(TASK_UNINTERRUPTIBLE);
1881 schedule_timeout(cfs_time_seconds(waited));
1882 if (waited > 5 && is_power_of_2(waited)) {
1883 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1884 "more than %d seconds. "
1885 "The obd refcount = %d. Is it stuck?\n",
1886 obd->obd_name, waited,
1887 atomic_read(&obd->obd_refcount));
1888 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1891 spin_lock(&obd->obd_dev_lock);
1893 spin_unlock(&obd->obd_dev_lock);
1895 EXPORT_SYMBOL(obd_exports_barrier);
1897 /* Total amount of zombies to be destroyed */
1898 static int zombies_count = 0;
1901 * kill zombie imports and exports
1903 void obd_zombie_impexp_cull(void)
1905 struct obd_import *import;
1906 struct obd_export *export;
1910 spin_lock(&obd_zombie_impexp_lock);
1913 if (!list_empty(&obd_zombie_imports)) {
1914 import = list_entry(obd_zombie_imports.next,
1917 list_del_init(&import->imp_zombie_chain);
1921 if (!list_empty(&obd_zombie_exports)) {
1922 export = list_entry(obd_zombie_exports.next,
1925 list_del_init(&export->exp_obd_chain);
1928 spin_unlock(&obd_zombie_impexp_lock);
1930 if (import != NULL) {
1931 class_import_destroy(import);
1932 spin_lock(&obd_zombie_impexp_lock);
1934 spin_unlock(&obd_zombie_impexp_lock);
1937 if (export != NULL) {
1938 class_export_destroy(export);
1939 spin_lock(&obd_zombie_impexp_lock);
1941 spin_unlock(&obd_zombie_impexp_lock);
1945 } while (import != NULL || export != NULL);
1949 static DECLARE_COMPLETION(obd_zombie_start);
1950 static DECLARE_COMPLETION(obd_zombie_stop);
1951 static unsigned long obd_zombie_flags;
1952 static DECLARE_WAIT_QUEUE_HEAD(obd_zombie_waitq);
1953 static pid_t obd_zombie_pid;
1956 OBD_ZOMBIE_STOP = 0x0001,
1960 * check for work for kill zombie import/export thread.
1962 static int obd_zombie_impexp_check(void *arg)
1966 spin_lock(&obd_zombie_impexp_lock);
1967 rc = (zombies_count == 0) &&
1968 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1969 spin_unlock(&obd_zombie_impexp_lock);
1975 * Add export to the obd_zombe thread and notify it.
1977 static void obd_zombie_export_add(struct obd_export *exp) {
1978 atomic_dec(&obd_stale_export_num);
1979 spin_lock(&exp->exp_obd->obd_dev_lock);
1980 LASSERT(!list_empty(&exp->exp_obd_chain));
1981 list_del_init(&exp->exp_obd_chain);
1982 spin_unlock(&exp->exp_obd->obd_dev_lock);
1983 spin_lock(&obd_zombie_impexp_lock);
1985 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1986 spin_unlock(&obd_zombie_impexp_lock);
1988 obd_zombie_impexp_notify();
1992 * Add import to the obd_zombe thread and notify it.
1994 static void obd_zombie_import_add(struct obd_import *imp) {
1995 LASSERT(imp->imp_sec == NULL);
1996 spin_lock(&obd_zombie_impexp_lock);
1997 LASSERT(list_empty(&imp->imp_zombie_chain));
1999 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
2000 spin_unlock(&obd_zombie_impexp_lock);
2002 obd_zombie_impexp_notify();
2006 * notify import/export destroy thread about new zombie.
2008 static void obd_zombie_impexp_notify(void)
2011 * Make sure obd_zomebie_impexp_thread get this notification.
2012 * It is possible this signal only get by obd_zombie_barrier, and
2013 * barrier gulps this notification and sleeps away and hangs ensues
2015 wake_up_all(&obd_zombie_waitq);
2019 * check whether obd_zombie is idle
2021 static int obd_zombie_is_idle(void)
2025 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
2026 spin_lock(&obd_zombie_impexp_lock);
2027 rc = (zombies_count == 0);
2028 spin_unlock(&obd_zombie_impexp_lock);
2033 * wait when obd_zombie import/export queues become empty
2035 void obd_zombie_barrier(void)
2037 struct l_wait_info lwi = { 0 };
2039 if (obd_zombie_pid == current_pid())
2040 /* don't wait for myself */
2042 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
2044 EXPORT_SYMBOL(obd_zombie_barrier);
2047 struct obd_export *obd_stale_export_get(void)
2049 struct obd_export *exp = NULL;
2052 spin_lock(&obd_stale_export_lock);
2053 if (!list_empty(&obd_stale_exports)) {
2054 exp = list_entry(obd_stale_exports.next,
2055 struct obd_export, exp_stale_list);
2056 list_del_init(&exp->exp_stale_list);
2058 spin_unlock(&obd_stale_export_lock);
2061 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
2062 atomic_read(&obd_stale_export_num));
2066 EXPORT_SYMBOL(obd_stale_export_get);
2068 void obd_stale_export_put(struct obd_export *exp)
2072 LASSERT(list_empty(&exp->exp_stale_list));
2073 if (exp->exp_lock_hash &&
2074 atomic_read(&exp->exp_lock_hash->hs_count)) {
2075 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
2076 atomic_read(&obd_stale_export_num));
2078 spin_lock_bh(&exp->exp_bl_list_lock);
2079 spin_lock(&obd_stale_export_lock);
2080 /* Add to the tail if there is no blocked locks,
2081 * to the head otherwise. */
2082 if (list_empty(&exp->exp_bl_list))
2083 list_add_tail(&exp->exp_stale_list,
2084 &obd_stale_exports);
2086 list_add(&exp->exp_stale_list,
2087 &obd_stale_exports);
2089 spin_unlock(&obd_stale_export_lock);
2090 spin_unlock_bh(&exp->exp_bl_list_lock);
2092 class_export_put(exp);
2096 EXPORT_SYMBOL(obd_stale_export_put);
2099 * Adjust the position of the export in the stale list,
2100 * i.e. move to the head of the list if is needed.
2102 void obd_stale_export_adjust(struct obd_export *exp)
2104 LASSERT(exp != NULL);
2105 spin_lock_bh(&exp->exp_bl_list_lock);
2106 spin_lock(&obd_stale_export_lock);
2108 if (!list_empty(&exp->exp_stale_list) &&
2109 !list_empty(&exp->exp_bl_list))
2110 list_move(&exp->exp_stale_list, &obd_stale_exports);
2112 spin_unlock(&obd_stale_export_lock);
2113 spin_unlock_bh(&exp->exp_bl_list_lock);
2115 EXPORT_SYMBOL(obd_stale_export_adjust);
2118 * destroy zombie export/import thread.
2120 static int obd_zombie_impexp_thread(void *unused)
2122 unshare_fs_struct();
2123 complete(&obd_zombie_start);
2125 obd_zombie_pid = current_pid();
2127 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
2128 struct l_wait_info lwi = { 0 };
2130 l_wait_event(obd_zombie_waitq,
2131 !obd_zombie_impexp_check(NULL), &lwi);
2132 obd_zombie_impexp_cull();
2135 * Notify obd_zombie_barrier callers that queues
2138 wake_up(&obd_zombie_waitq);
2141 complete(&obd_zombie_stop);
2148 * start destroy zombie import/export thread
2150 int obd_zombie_impexp_init(void)
2152 struct task_struct *task;
2154 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
2156 RETURN(PTR_ERR(task));
2158 wait_for_completion(&obd_zombie_start);
2162 * stop destroy zombie import/export thread
2164 void obd_zombie_impexp_stop(void)
2166 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
2167 obd_zombie_impexp_notify();
2168 wait_for_completion(&obd_zombie_stop);
2169 LASSERT(list_empty(&obd_stale_exports));
2172 /***** Kernel-userspace comm helpers *******/
2174 /* Get length of entire message, including header */
2175 int kuc_len(int payload_len)
2177 return sizeof(struct kuc_hdr) + payload_len;
2179 EXPORT_SYMBOL(kuc_len);
2181 /* Get a pointer to kuc header, given a ptr to the payload
2182 * @param p Pointer to payload area
2183 * @returns Pointer to kuc header
2185 struct kuc_hdr * kuc_ptr(void *p)
2187 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2188 LASSERT(lh->kuc_magic == KUC_MAGIC);
2191 EXPORT_SYMBOL(kuc_ptr);
2193 /* Alloc space for a message, and fill in header
2194 * @return Pointer to payload area
2196 void *kuc_alloc(int payload_len, int transport, int type)
2199 int len = kuc_len(payload_len);
2203 return ERR_PTR(-ENOMEM);
2205 lh->kuc_magic = KUC_MAGIC;
2206 lh->kuc_transport = transport;
2207 lh->kuc_msgtype = type;
2208 lh->kuc_msglen = len;
2210 return (void *)(lh + 1);
2212 EXPORT_SYMBOL(kuc_alloc);
2214 /* Takes pointer to payload area */
2215 void kuc_free(void *p, int payload_len)
2217 struct kuc_hdr *lh = kuc_ptr(p);
2218 OBD_FREE(lh, kuc_len(payload_len));
2220 EXPORT_SYMBOL(kuc_free);
2222 struct obd_request_slot_waiter {
2223 struct list_head orsw_entry;
2224 wait_queue_head_t orsw_waitq;
2228 static bool obd_request_slot_avail(struct client_obd *cli,
2229 struct obd_request_slot_waiter *orsw)
2233 spin_lock(&cli->cl_loi_list_lock);
2234 avail = !!list_empty(&orsw->orsw_entry);
2235 spin_unlock(&cli->cl_loi_list_lock);
2241 * For network flow control, the RPC sponsor needs to acquire a credit
2242 * before sending the RPC. The credits count for a connection is defined
2243 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2244 * the subsequent RPC sponsors need to wait until others released their
2245 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2247 int obd_get_request_slot(struct client_obd *cli)
2249 struct obd_request_slot_waiter orsw;
2250 struct l_wait_info lwi;
2253 spin_lock(&cli->cl_loi_list_lock);
2254 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2255 cli->cl_rpcs_in_flight++;
2256 spin_unlock(&cli->cl_loi_list_lock);
2260 init_waitqueue_head(&orsw.orsw_waitq);
2261 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2262 orsw.orsw_signaled = false;
2263 spin_unlock(&cli->cl_loi_list_lock);
2265 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2266 rc = l_wait_event(orsw.orsw_waitq,
2267 obd_request_slot_avail(cli, &orsw) ||
2271 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2272 * freed but other (such as obd_put_request_slot) is using it. */
2273 spin_lock(&cli->cl_loi_list_lock);
2275 if (!orsw.orsw_signaled) {
2276 if (list_empty(&orsw.orsw_entry))
2277 cli->cl_rpcs_in_flight--;
2279 list_del(&orsw.orsw_entry);
2283 if (orsw.orsw_signaled) {
2284 LASSERT(list_empty(&orsw.orsw_entry));
2288 spin_unlock(&cli->cl_loi_list_lock);
2292 EXPORT_SYMBOL(obd_get_request_slot);
2294 void obd_put_request_slot(struct client_obd *cli)
2296 struct obd_request_slot_waiter *orsw;
2298 spin_lock(&cli->cl_loi_list_lock);
2299 cli->cl_rpcs_in_flight--;
2301 /* If there is free slot, wakeup the first waiter. */
2302 if (!list_empty(&cli->cl_flight_waiters) &&
2303 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2304 orsw = list_entry(cli->cl_flight_waiters.next,
2305 struct obd_request_slot_waiter, orsw_entry);
2306 list_del_init(&orsw->orsw_entry);
2307 cli->cl_rpcs_in_flight++;
2308 wake_up(&orsw->orsw_waitq);
2310 spin_unlock(&cli->cl_loi_list_lock);
2312 EXPORT_SYMBOL(obd_put_request_slot);
2314 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2316 return cli->cl_max_rpcs_in_flight;
2318 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2320 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2322 struct obd_request_slot_waiter *orsw;
2329 if (max > OBD_MAX_RIF_MAX || max < 1)
2332 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2333 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2334 /* adjust max_mod_rpcs_in_flight to ensure it is always
2335 * strictly lower that max_rpcs_in_flight */
2337 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2338 "because it must be higher than "
2339 "max_mod_rpcs_in_flight value",
2340 cli->cl_import->imp_obd->obd_name);
2343 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2344 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2350 spin_lock(&cli->cl_loi_list_lock);
2351 old = cli->cl_max_rpcs_in_flight;
2352 cli->cl_max_rpcs_in_flight = max;
2353 client_adjust_max_dirty(cli);
2357 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2358 for (i = 0; i < diff; i++) {
2359 if (list_empty(&cli->cl_flight_waiters))
2362 orsw = list_entry(cli->cl_flight_waiters.next,
2363 struct obd_request_slot_waiter, orsw_entry);
2364 list_del_init(&orsw->orsw_entry);
2365 cli->cl_rpcs_in_flight++;
2366 wake_up(&orsw->orsw_waitq);
2368 spin_unlock(&cli->cl_loi_list_lock);
2372 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2374 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2376 return cli->cl_max_mod_rpcs_in_flight;
2378 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2380 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2382 struct obd_connect_data *ocd;
2386 if (max > OBD_MAX_RIF_MAX || max < 1)
2389 /* cannot exceed or equal max_rpcs_in_flight */
2390 if (max >= cli->cl_max_rpcs_in_flight) {
2391 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2392 "higher or equal to max_rpcs_in_flight value (%u)\n",
2393 cli->cl_import->imp_obd->obd_name,
2394 max, cli->cl_max_rpcs_in_flight);
2398 /* cannot exceed max modify RPCs in flight supported by the server */
2399 ocd = &cli->cl_import->imp_connect_data;
2400 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2401 maxmodrpcs = ocd->ocd_maxmodrpcs;
2404 if (max > maxmodrpcs) {
2405 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2406 "higher than max_mod_rpcs_per_client value (%hu) "
2407 "returned by the server at connection\n",
2408 cli->cl_import->imp_obd->obd_name,
2413 spin_lock(&cli->cl_mod_rpcs_lock);
2415 prev = cli->cl_max_mod_rpcs_in_flight;
2416 cli->cl_max_mod_rpcs_in_flight = max;
2418 /* wakeup waiters if limit has been increased */
2419 if (cli->cl_max_mod_rpcs_in_flight > prev)
2420 wake_up(&cli->cl_mod_rpcs_waitq);
2422 spin_unlock(&cli->cl_mod_rpcs_lock);
2426 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2429 #define pct(a, b) (b ? a * 100 / b : 0)
2430 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2431 struct seq_file *seq)
2433 unsigned long mod_tot = 0, mod_cum;
2434 struct timespec64 now;
2437 ktime_get_real_ts64(&now);
2439 spin_lock(&cli->cl_mod_rpcs_lock);
2441 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2442 (s64)now.tv_sec, now.tv_nsec);
2443 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2444 cli->cl_mod_rpcs_in_flight);
2446 seq_printf(seq, "\n\t\t\tmodify\n");
2447 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2449 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2452 for (i = 0; i < OBD_HIST_MAX; i++) {
2453 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2455 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2456 i, mod, pct(mod, mod_tot),
2457 pct(mod_cum, mod_tot));
2458 if (mod_cum == mod_tot)
2462 spin_unlock(&cli->cl_mod_rpcs_lock);
2466 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2470 /* The number of modify RPCs sent in parallel is limited
2471 * because the server has a finite number of slots per client to
2472 * store request result and ensure reply reconstruction when needed.
2473 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2474 * that takes into account server limit and cl_max_rpcs_in_flight
2476 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2477 * one close request is allowed above the maximum.
2479 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2484 /* A slot is available if
2485 * - number of modify RPCs in flight is less than the max
2486 * - it's a close RPC and no other close request is in flight
2488 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2489 (close_req && cli->cl_close_rpcs_in_flight == 0);
2494 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2499 spin_lock(&cli->cl_mod_rpcs_lock);
2500 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2501 spin_unlock(&cli->cl_mod_rpcs_lock);
2505 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2508 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2509 it->it_op == IT_READDIR ||
2510 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2515 /* Get a modify RPC slot from the obd client @cli according
2516 * to the kind of operation @opc that is going to be sent
2517 * and the intent @it of the operation if it applies.
2518 * If the maximum number of modify RPCs in flight is reached
2519 * the thread is put to sleep.
2520 * Returns the tag to be set in the request message. Tag 0
2521 * is reserved for non-modifying requests.
2523 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2524 struct lookup_intent *it)
2526 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2527 bool close_req = false;
2530 /* read-only metadata RPCs don't consume a slot on MDT
2531 * for reply reconstruction
2533 if (obd_skip_mod_rpc_slot(it))
2536 if (opc == MDS_CLOSE)
2540 spin_lock(&cli->cl_mod_rpcs_lock);
2541 max = cli->cl_max_mod_rpcs_in_flight;
2542 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2543 /* there is a slot available */
2544 cli->cl_mod_rpcs_in_flight++;
2546 cli->cl_close_rpcs_in_flight++;
2547 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2548 cli->cl_mod_rpcs_in_flight);
2549 /* find a free tag */
2550 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2552 LASSERT(i < OBD_MAX_RIF_MAX);
2553 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2554 spin_unlock(&cli->cl_mod_rpcs_lock);
2555 /* tag 0 is reserved for non-modify RPCs */
2558 spin_unlock(&cli->cl_mod_rpcs_lock);
2560 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2561 "opc %u, max %hu\n",
2562 cli->cl_import->imp_obd->obd_name, opc, max);
2564 l_wait_event(cli->cl_mod_rpcs_waitq,
2565 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2568 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2570 /* Put a modify RPC slot from the obd client @cli according
2571 * to the kind of operation @opc that has been sent and the
2572 * intent @it of the operation if it applies.
2574 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2575 struct lookup_intent *it, __u16 tag)
2577 bool close_req = false;
2579 if (obd_skip_mod_rpc_slot(it))
2582 if (opc == MDS_CLOSE)
2585 spin_lock(&cli->cl_mod_rpcs_lock);
2586 cli->cl_mod_rpcs_in_flight--;
2588 cli->cl_close_rpcs_in_flight--;
2589 /* release the tag in the bitmap */
2590 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2591 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2592 spin_unlock(&cli->cl_mod_rpcs_lock);
2593 wake_up(&cli->cl_mod_rpcs_waitq);
2595 EXPORT_SYMBOL(obd_put_mod_rpc_slot);