4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
48 static DEFINE_SPINLOCK(obd_types_lock);
49 static LIST_HEAD(obd_types);
50 DEFINE_RWLOCK(obd_dev_lock);
51 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
53 static struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 static struct kmem_cache *import_cachep;
58 static LIST_HEAD(obd_zombie_imports);
59 static LIST_HEAD(obd_zombie_exports);
60 static DEFINE_SPINLOCK(obd_zombie_impexp_lock);
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp,
66 const char *status, int locks, int debug_level);
68 static LIST_HEAD(obd_stale_exports);
69 static DEFINE_SPINLOCK(obd_stale_export_lock);
70 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
72 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
73 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
76 * support functions: we could use inter-module communication, but this
77 * is more portable to other OS's
79 static struct obd_device *obd_device_alloc(void)
81 struct obd_device *obd;
83 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
85 obd->obd_magic = OBD_DEVICE_MAGIC;
90 static void obd_device_free(struct obd_device *obd)
93 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
94 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
95 if (obd->obd_namespace != NULL) {
96 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
97 obd, obd->obd_namespace, obd->obd_force);
100 lu_ref_fini(&obd->obd_reference);
101 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
104 struct obd_type *class_search_type(const char *name)
106 struct list_head *tmp;
107 struct obd_type *type;
109 spin_lock(&obd_types_lock);
110 list_for_each(tmp, &obd_types) {
111 type = list_entry(tmp, struct obd_type, typ_chain);
112 if (strcmp(type->typ_name, name) == 0) {
113 spin_unlock(&obd_types_lock);
117 spin_unlock(&obd_types_lock);
120 EXPORT_SYMBOL(class_search_type);
122 struct obd_type *class_get_type(const char *name)
124 struct obd_type *type = class_search_type(name);
126 #ifdef HAVE_MODULE_LOADING_SUPPORT
128 const char *modname = name;
130 if (strcmp(modname, "obdfilter") == 0)
133 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
134 modname = LUSTRE_OSP_NAME;
136 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
137 modname = LUSTRE_MDT_NAME;
139 if (!request_module("%s", modname)) {
140 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
141 type = class_search_type(name);
143 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
149 spin_lock(&type->obd_type_lock);
151 try_module_get(type->typ_dt_ops->o_owner);
152 spin_unlock(&type->obd_type_lock);
157 void class_put_type(struct obd_type *type)
160 spin_lock(&type->obd_type_lock);
162 module_put(type->typ_dt_ops->o_owner);
163 spin_unlock(&type->obd_type_lock);
166 static void class_sysfs_release(struct kobject *kobj)
168 OBD_FREE(kobj, sizeof(*kobj));
171 static struct kobj_type class_ktype = {
172 .sysfs_ops = &lustre_sysfs_ops,
173 .release = class_sysfs_release,
176 struct kobject *class_setup_tunables(const char *name)
178 struct kobject *kobj;
181 #ifdef HAVE_SERVER_SUPPORT
182 kobj = kset_find_obj(lustre_kset, name);
186 OBD_ALLOC(kobj, sizeof(*kobj));
188 return ERR_PTR(-ENOMEM);
190 kobj->kset = lustre_kset;
191 kobject_init(kobj, &class_ktype);
192 rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
199 EXPORT_SYMBOL(class_setup_tunables);
201 #define CLASS_MAX_NAME 1024
203 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
204 bool enable_proc, struct lprocfs_vars *vars,
205 const char *name, struct lu_device_type *ldt)
207 struct obd_type *type;
208 #ifdef HAVE_SERVER_SUPPORT
210 #endif /* HAVE_SERVER_SUPPORT */
215 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
217 if (class_search_type(name)) {
218 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
223 OBD_ALLOC(type, sizeof(*type));
227 OBD_ALLOC_PTR(type->typ_dt_ops);
228 OBD_ALLOC_PTR(type->typ_md_ops);
229 OBD_ALLOC(type->typ_name, strlen(name) + 1);
231 if (type->typ_dt_ops == NULL ||
232 type->typ_md_ops == NULL ||
233 type->typ_name == NULL)
236 *(type->typ_dt_ops) = *dt_ops;
237 /* md_ops is optional */
239 *(type->typ_md_ops) = *md_ops;
240 strcpy(type->typ_name, name);
241 spin_lock_init(&type->obd_type_lock);
243 #ifdef CONFIG_PROC_FS
245 type->typ_procroot = lprocfs_register(type->typ_name,
248 if (IS_ERR(type->typ_procroot)) {
249 rc = PTR_ERR(type->typ_procroot);
250 type->typ_procroot = NULL;
255 #ifdef HAVE_SERVER_SUPPORT
257 dname.len = strlen(dname.name);
258 dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
260 type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
261 if (type->typ_debugfs_entry) {
262 dput(type->typ_debugfs_entry);
263 type->typ_sym_filter = true;
266 #endif /* HAVE_SERVER_SUPPORT */
268 type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
271 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
272 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
274 type->typ_debugfs_entry = NULL;
277 #ifdef HAVE_SERVER_SUPPORT
280 type->typ_kobj = class_setup_tunables(type->typ_name);
281 if (IS_ERR(type->typ_kobj))
282 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
286 rc = lu_device_type_init(ldt);
288 kobject_put(type->typ_kobj);
293 spin_lock(&obd_types_lock);
294 list_add(&type->typ_chain, &obd_types);
295 spin_unlock(&obd_types_lock);
300 #ifdef HAVE_SERVER_SUPPORT
301 if (type->typ_sym_filter)
302 type->typ_debugfs_entry = NULL;
304 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
305 ldebugfs_remove(&type->typ_debugfs_entry);
306 if (type->typ_name != NULL) {
307 #ifdef CONFIG_PROC_FS
308 if (type->typ_procroot != NULL)
309 remove_proc_subtree(type->typ_name, proc_lustre_root);
311 OBD_FREE(type->typ_name, strlen(name) + 1);
313 if (type->typ_md_ops != NULL)
314 OBD_FREE_PTR(type->typ_md_ops);
315 if (type->typ_dt_ops != NULL)
316 OBD_FREE_PTR(type->typ_dt_ops);
317 OBD_FREE(type, sizeof(*type));
320 EXPORT_SYMBOL(class_register_type);
322 int class_unregister_type(const char *name)
324 struct obd_type *type = class_search_type(name);
328 CERROR("unknown obd type\n");
332 if (type->typ_refcnt) {
333 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
334 /* This is a bad situation, let's make the best of it */
335 /* Remove ops, but leave the name for debugging */
336 OBD_FREE_PTR(type->typ_dt_ops);
337 OBD_FREE_PTR(type->typ_md_ops);
341 kobject_put(type->typ_kobj);
343 /* we do not use type->typ_procroot as for compatibility purposes
344 * other modules can share names (i.e. lod can use lov entry). so
345 * we can't reference pointer as it can get invalided when another
346 * module removes the entry */
347 #ifdef CONFIG_PROC_FS
348 if (type->typ_procroot != NULL)
349 remove_proc_subtree(type->typ_name, proc_lustre_root);
350 if (type->typ_procsym != NULL)
351 lprocfs_remove(&type->typ_procsym);
353 #ifdef HAVE_SERVER_SUPPORT
354 if (type->typ_sym_filter)
355 type->typ_debugfs_entry = NULL;
357 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
358 ldebugfs_remove(&type->typ_debugfs_entry);
361 lu_device_type_fini(type->typ_lu);
363 spin_lock(&obd_types_lock);
364 list_del(&type->typ_chain);
365 spin_unlock(&obd_types_lock);
366 OBD_FREE(type->typ_name, strlen(name) + 1);
367 if (type->typ_dt_ops != NULL)
368 OBD_FREE_PTR(type->typ_dt_ops);
369 if (type->typ_md_ops != NULL)
370 OBD_FREE_PTR(type->typ_md_ops);
371 OBD_FREE(type, sizeof(*type));
373 } /* class_unregister_type */
374 EXPORT_SYMBOL(class_unregister_type);
377 * Create a new obd device.
379 * Allocate the new obd_device and initialize it.
381 * \param[in] type_name obd device type string.
382 * \param[in] name obd device name.
383 * \param[in] uuid obd device UUID
385 * \retval newdev pointer to created obd_device
386 * \retval ERR_PTR(errno) on error
388 struct obd_device *class_newdev(const char *type_name, const char *name,
391 struct obd_device *newdev;
392 struct obd_type *type = NULL;
395 if (strlen(name) >= MAX_OBD_NAME) {
396 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
397 RETURN(ERR_PTR(-EINVAL));
400 type = class_get_type(type_name);
402 CERROR("OBD: unknown type: %s\n", type_name);
403 RETURN(ERR_PTR(-ENODEV));
406 newdev = obd_device_alloc();
407 if (newdev == NULL) {
408 class_put_type(type);
409 RETURN(ERR_PTR(-ENOMEM));
411 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
412 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
413 newdev->obd_type = type;
414 newdev->obd_minor = -1;
416 rwlock_init(&newdev->obd_pool_lock);
417 newdev->obd_pool_limit = 0;
418 newdev->obd_pool_slv = 0;
420 INIT_LIST_HEAD(&newdev->obd_exports);
421 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
422 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
423 INIT_LIST_HEAD(&newdev->obd_exports_timed);
424 INIT_LIST_HEAD(&newdev->obd_nid_stats);
425 spin_lock_init(&newdev->obd_nid_lock);
426 spin_lock_init(&newdev->obd_dev_lock);
427 mutex_init(&newdev->obd_dev_mutex);
428 spin_lock_init(&newdev->obd_osfs_lock);
429 /* newdev->obd_osfs_age must be set to a value in the distant
430 * past to guarantee a fresh statfs is fetched on mount. */
431 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
433 /* XXX belongs in setup not attach */
434 init_rwsem(&newdev->obd_observer_link_sem);
436 init_timer(&newdev->obd_recovery_timer);
437 spin_lock_init(&newdev->obd_recovery_task_lock);
438 init_waitqueue_head(&newdev->obd_next_transno_waitq);
439 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
440 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
441 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
442 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
443 INIT_LIST_HEAD(&newdev->obd_evict_list);
444 INIT_LIST_HEAD(&newdev->obd_lwp_list);
446 llog_group_init(&newdev->obd_olg);
447 /* Detach drops this */
448 atomic_set(&newdev->obd_refcount, 1);
449 lu_ref_init(&newdev->obd_reference);
450 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
452 newdev->obd_conn_inprogress = 0;
454 strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
456 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
457 newdev->obd_name, newdev);
465 * \param[in] obd obd_device to be freed
469 void class_free_dev(struct obd_device *obd)
471 struct obd_type *obd_type = obd->obd_type;
473 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
474 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
475 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
476 "obd %p != obd_devs[%d] %p\n",
477 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
478 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
479 "obd_refcount should be 0, not %d\n",
480 atomic_read(&obd->obd_refcount));
481 LASSERT(obd_type != NULL);
483 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
484 obd->obd_name, obd->obd_type->typ_name);
486 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
487 obd->obd_name, obd->obd_uuid.uuid);
488 if (obd->obd_stopping) {
491 /* If we're not stopping, we were never set up */
492 err = obd_cleanup(obd);
494 CERROR("Cleanup %s returned %d\n",
498 obd_device_free(obd);
500 class_put_type(obd_type);
504 * Unregister obd device.
506 * Free slot in obd_dev[] used by \a obd.
508 * \param[in] new_obd obd_device to be unregistered
512 void class_unregister_device(struct obd_device *obd)
514 write_lock(&obd_dev_lock);
515 if (obd->obd_minor >= 0) {
516 LASSERT(obd_devs[obd->obd_minor] == obd);
517 obd_devs[obd->obd_minor] = NULL;
520 write_unlock(&obd_dev_lock);
524 * Register obd device.
526 * Find free slot in obd_devs[], fills it with \a new_obd.
528 * \param[in] new_obd obd_device to be registered
531 * \retval -EEXIST device with this name is registered
532 * \retval -EOVERFLOW obd_devs[] is full
534 int class_register_device(struct obd_device *new_obd)
538 int new_obd_minor = 0;
539 bool minor_assign = false;
540 bool retried = false;
543 write_lock(&obd_dev_lock);
544 for (i = 0; i < class_devno_max(); i++) {
545 struct obd_device *obd = class_num2obd(i);
548 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
551 write_unlock(&obd_dev_lock);
553 /* the obd_device could be waited to be
554 * destroyed by the "obd_zombie_impexp_thread".
556 obd_zombie_barrier();
561 CERROR("%s: already exists, won't add\n",
563 /* in case we found a free slot before duplicate */
564 minor_assign = false;
568 if (!minor_assign && obd == NULL) {
575 new_obd->obd_minor = new_obd_minor;
576 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
577 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
578 obd_devs[new_obd_minor] = new_obd;
582 CERROR("%s: all %u/%u devices used, increase "
583 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
584 i, class_devno_max(), ret);
587 write_unlock(&obd_dev_lock);
592 static int class_name2dev_nolock(const char *name)
599 for (i = 0; i < class_devno_max(); i++) {
600 struct obd_device *obd = class_num2obd(i);
602 if (obd && strcmp(name, obd->obd_name) == 0) {
603 /* Make sure we finished attaching before we give
604 out any references */
605 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
606 if (obd->obd_attached) {
616 int class_name2dev(const char *name)
623 read_lock(&obd_dev_lock);
624 i = class_name2dev_nolock(name);
625 read_unlock(&obd_dev_lock);
629 EXPORT_SYMBOL(class_name2dev);
631 struct obd_device *class_name2obd(const char *name)
633 int dev = class_name2dev(name);
635 if (dev < 0 || dev > class_devno_max())
637 return class_num2obd(dev);
639 EXPORT_SYMBOL(class_name2obd);
641 int class_uuid2dev_nolock(struct obd_uuid *uuid)
645 for (i = 0; i < class_devno_max(); i++) {
646 struct obd_device *obd = class_num2obd(i);
648 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
649 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
657 int class_uuid2dev(struct obd_uuid *uuid)
661 read_lock(&obd_dev_lock);
662 i = class_uuid2dev_nolock(uuid);
663 read_unlock(&obd_dev_lock);
667 EXPORT_SYMBOL(class_uuid2dev);
669 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
671 int dev = class_uuid2dev(uuid);
674 return class_num2obd(dev);
676 EXPORT_SYMBOL(class_uuid2obd);
679 * Get obd device from ::obd_devs[]
681 * \param num [in] array index
683 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
684 * otherwise return the obd device there.
686 struct obd_device *class_num2obd(int num)
688 struct obd_device *obd = NULL;
690 if (num < class_devno_max()) {
695 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
696 "%p obd_magic %08x != %08x\n",
697 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
698 LASSERTF(obd->obd_minor == num,
699 "%p obd_minor %0d != %0d\n",
700 obd, obd->obd_minor, num);
707 * Find obd in obd_dev[] by name or uuid.
709 * Increment obd's refcount if found.
711 * \param[in] str obd name or uuid
713 * \retval NULL if not found
714 * \retval target pointer to found obd_device
716 struct obd_device *class_dev_by_str(const char *str)
718 struct obd_device *target = NULL;
719 struct obd_uuid tgtuuid;
722 obd_str2uuid(&tgtuuid, str);
724 read_lock(&obd_dev_lock);
725 rc = class_uuid2dev_nolock(&tgtuuid);
727 rc = class_name2dev_nolock(str);
730 target = class_num2obd(rc);
733 class_incref(target, "find", current);
734 read_unlock(&obd_dev_lock);
738 EXPORT_SYMBOL(class_dev_by_str);
741 * Get obd devices count. Device in any
743 * \retval obd device count
745 int get_devices_count(void)
747 int index, max_index = class_devno_max(), dev_count = 0;
749 read_lock(&obd_dev_lock);
750 for (index = 0; index <= max_index; index++) {
751 struct obd_device *obd = class_num2obd(index);
755 read_unlock(&obd_dev_lock);
759 EXPORT_SYMBOL(get_devices_count);
761 void class_obd_list(void)
766 read_lock(&obd_dev_lock);
767 for (i = 0; i < class_devno_max(); i++) {
768 struct obd_device *obd = class_num2obd(i);
772 if (obd->obd_stopping)
774 else if (obd->obd_set_up)
776 else if (obd->obd_attached)
780 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
781 i, status, obd->obd_type->typ_name,
782 obd->obd_name, obd->obd_uuid.uuid,
783 atomic_read(&obd->obd_refcount));
785 read_unlock(&obd_dev_lock);
789 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
790 specified, then only the client with that uuid is returned,
791 otherwise any client connected to the tgt is returned. */
792 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
793 const char * typ_name,
794 struct obd_uuid *grp_uuid)
798 read_lock(&obd_dev_lock);
799 for (i = 0; i < class_devno_max(); i++) {
800 struct obd_device *obd = class_num2obd(i);
804 if ((strncmp(obd->obd_type->typ_name, typ_name,
805 strlen(typ_name)) == 0)) {
806 if (obd_uuid_equals(tgt_uuid,
807 &obd->u.cli.cl_target_uuid) &&
808 ((grp_uuid)? obd_uuid_equals(grp_uuid,
809 &obd->obd_uuid) : 1)) {
810 read_unlock(&obd_dev_lock);
815 read_unlock(&obd_dev_lock);
819 EXPORT_SYMBOL(class_find_client_obd);
821 /* Iterate the obd_device list looking devices have grp_uuid. Start
822 searching at *next, and if a device is found, the next index to look
823 at is saved in *next. If next is NULL, then the first matching device
824 will always be returned. */
825 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
831 else if (*next >= 0 && *next < class_devno_max())
836 read_lock(&obd_dev_lock);
837 for (; i < class_devno_max(); i++) {
838 struct obd_device *obd = class_num2obd(i);
842 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
845 read_unlock(&obd_dev_lock);
849 read_unlock(&obd_dev_lock);
853 EXPORT_SYMBOL(class_devices_in_group);
856 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
857 * adjust sptlrpc settings accordingly.
859 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
861 struct obd_device *obd;
865 LASSERT(namelen > 0);
867 read_lock(&obd_dev_lock);
868 for (i = 0; i < class_devno_max(); i++) {
869 obd = class_num2obd(i);
871 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
874 /* only notify mdc, osc, osp, lwp, mdt, ost
875 * because only these have a -sptlrpc llog */
876 type = obd->obd_type->typ_name;
877 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
878 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
879 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
880 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
881 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
882 strcmp(type, LUSTRE_OST_NAME) != 0)
885 if (strncmp(obd->obd_name, fsname, namelen))
888 class_incref(obd, __FUNCTION__, obd);
889 read_unlock(&obd_dev_lock);
890 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
891 sizeof(KEY_SPTLRPC_CONF),
892 KEY_SPTLRPC_CONF, 0, NULL, NULL);
894 class_decref(obd, __FUNCTION__, obd);
895 read_lock(&obd_dev_lock);
897 read_unlock(&obd_dev_lock);
900 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
902 void obd_cleanup_caches(void)
905 if (obd_device_cachep) {
906 kmem_cache_destroy(obd_device_cachep);
907 obd_device_cachep = NULL;
910 kmem_cache_destroy(obdo_cachep);
914 kmem_cache_destroy(import_cachep);
915 import_cachep = NULL;
921 int obd_init_caches(void)
926 LASSERT(obd_device_cachep == NULL);
927 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
928 sizeof(struct obd_device),
930 if (!obd_device_cachep)
931 GOTO(out, rc = -ENOMEM);
933 LASSERT(obdo_cachep == NULL);
934 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
937 GOTO(out, rc = -ENOMEM);
939 LASSERT(import_cachep == NULL);
940 import_cachep = kmem_cache_create("ll_import_cache",
941 sizeof(struct obd_import),
944 GOTO(out, rc = -ENOMEM);
948 obd_cleanup_caches();
952 /* map connection to client */
953 struct obd_export *class_conn2export(struct lustre_handle *conn)
955 struct obd_export *export;
959 CDEBUG(D_CACHE, "looking for null handle\n");
963 if (conn->cookie == -1) { /* this means assign a new connection */
964 CDEBUG(D_CACHE, "want a new connection\n");
968 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
969 export = class_handle2object(conn->cookie, NULL);
972 EXPORT_SYMBOL(class_conn2export);
974 struct obd_device *class_exp2obd(struct obd_export *exp)
980 EXPORT_SYMBOL(class_exp2obd);
982 struct obd_device *class_conn2obd(struct lustre_handle *conn)
984 struct obd_export *export;
985 export = class_conn2export(conn);
987 struct obd_device *obd = export->exp_obd;
988 class_export_put(export);
994 struct obd_import *class_exp2cliimp(struct obd_export *exp)
996 struct obd_device *obd = exp->exp_obd;
999 return obd->u.cli.cl_import;
1001 EXPORT_SYMBOL(class_exp2cliimp);
1003 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
1005 struct obd_device *obd = class_conn2obd(conn);
1008 return obd->u.cli.cl_import;
1011 /* Export management functions */
1012 static void class_export_destroy(struct obd_export *exp)
1014 struct obd_device *obd = exp->exp_obd;
1017 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
1018 LASSERT(obd != NULL);
1020 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
1021 exp->exp_client_uuid.uuid, obd->obd_name);
1023 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
1024 if (exp->exp_connection)
1025 ptlrpc_put_connection_superhack(exp->exp_connection);
1027 LASSERT(list_empty(&exp->exp_outstanding_replies));
1028 LASSERT(list_empty(&exp->exp_uncommitted_replies));
1029 LASSERT(list_empty(&exp->exp_req_replay_queue));
1030 LASSERT(list_empty(&exp->exp_hp_rpcs));
1031 obd_destroy_export(exp);
1032 /* self export doesn't hold a reference to an obd, although it
1033 * exists until freeing of the obd */
1034 if (exp != obd->obd_self_export)
1035 class_decref(obd, "export", exp);
1037 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1041 static void export_handle_addref(void *export)
1043 class_export_get(export);
1046 static struct portals_handle_ops export_handle_ops = {
1047 .hop_addref = export_handle_addref,
1051 struct obd_export *class_export_get(struct obd_export *exp)
1053 atomic_inc(&exp->exp_refcount);
1054 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1055 atomic_read(&exp->exp_refcount));
1058 EXPORT_SYMBOL(class_export_get);
1060 void class_export_put(struct obd_export *exp)
1062 LASSERT(exp != NULL);
1063 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1064 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1065 atomic_read(&exp->exp_refcount) - 1);
1067 if (atomic_dec_and_test(&exp->exp_refcount)) {
1068 struct obd_device *obd = exp->exp_obd;
1070 CDEBUG(D_IOCTL, "final put %p/%s\n",
1071 exp, exp->exp_client_uuid.uuid);
1073 /* release nid stat refererence */
1074 lprocfs_exp_cleanup(exp);
1076 if (exp == obd->obd_self_export) {
1077 /* self export should be destroyed without
1078 * zombie thread as it doesn't hold a
1079 * reference to obd and doesn't hold any
1081 class_export_destroy(exp);
1082 /* self export is destroyed, no class
1083 * references exist and it is safe to free
1085 class_free_dev(obd);
1087 LASSERT(!list_empty(&exp->exp_obd_chain));
1088 obd_zombie_export_add(exp);
1093 EXPORT_SYMBOL(class_export_put);
1094 /* Creates a new export, adds it to the hash table, and returns a
1095 * pointer to it. The refcount is 2: one for the hash reference, and
1096 * one for the pointer returned by this function. */
1097 struct obd_export *__class_new_export(struct obd_device *obd,
1098 struct obd_uuid *cluuid, bool is_self)
1100 struct obd_export *export;
1101 struct cfs_hash *hash = NULL;
1105 OBD_ALLOC_PTR(export);
1107 return ERR_PTR(-ENOMEM);
1109 export->exp_conn_cnt = 0;
1110 export->exp_lock_hash = NULL;
1111 export->exp_flock_hash = NULL;
1112 /* 2 = class_handle_hash + last */
1113 atomic_set(&export->exp_refcount, 2);
1114 atomic_set(&export->exp_rpc_count, 0);
1115 atomic_set(&export->exp_cb_count, 0);
1116 atomic_set(&export->exp_locks_count, 0);
1117 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1118 INIT_LIST_HEAD(&export->exp_locks_list);
1119 spin_lock_init(&export->exp_locks_list_guard);
1121 atomic_set(&export->exp_replay_count, 0);
1122 export->exp_obd = obd;
1123 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1124 spin_lock_init(&export->exp_uncommitted_replies_lock);
1125 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1126 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1127 INIT_LIST_HEAD(&export->exp_handle.h_link);
1128 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1129 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1130 class_handle_hash(&export->exp_handle, &export_handle_ops);
1131 export->exp_last_request_time = ktime_get_real_seconds();
1132 spin_lock_init(&export->exp_lock);
1133 spin_lock_init(&export->exp_rpc_lock);
1134 INIT_HLIST_NODE(&export->exp_uuid_hash);
1135 INIT_HLIST_NODE(&export->exp_nid_hash);
1136 INIT_HLIST_NODE(&export->exp_gen_hash);
1137 spin_lock_init(&export->exp_bl_list_lock);
1138 INIT_LIST_HEAD(&export->exp_bl_list);
1139 INIT_LIST_HEAD(&export->exp_stale_list);
1141 export->exp_sp_peer = LUSTRE_SP_ANY;
1142 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1143 export->exp_client_uuid = *cluuid;
1144 obd_init_export(export);
1146 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1147 spin_lock(&obd->obd_dev_lock);
1148 /* shouldn't happen, but might race */
1149 if (obd->obd_stopping)
1150 GOTO(exit_unlock, rc = -ENODEV);
1152 hash = cfs_hash_getref(obd->obd_uuid_hash);
1154 GOTO(exit_unlock, rc = -ENODEV);
1155 spin_unlock(&obd->obd_dev_lock);
1157 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1159 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1160 obd->obd_name, cluuid->uuid, rc);
1161 GOTO(exit_err, rc = -EALREADY);
1165 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1166 spin_lock(&obd->obd_dev_lock);
1167 if (obd->obd_stopping) {
1169 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1170 GOTO(exit_unlock, rc = -ESHUTDOWN);
1174 class_incref(obd, "export", export);
1175 list_add_tail(&export->exp_obd_chain_timed,
1176 &obd->obd_exports_timed);
1177 list_add(&export->exp_obd_chain, &obd->obd_exports);
1178 obd->obd_num_exports++;
1180 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1181 INIT_LIST_HEAD(&export->exp_obd_chain);
1183 spin_unlock(&obd->obd_dev_lock);
1185 cfs_hash_putref(hash);
1189 spin_unlock(&obd->obd_dev_lock);
1192 cfs_hash_putref(hash);
1193 class_handle_unhash(&export->exp_handle);
1194 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1195 obd_destroy_export(export);
1196 OBD_FREE_PTR(export);
1200 struct obd_export *class_new_export(struct obd_device *obd,
1201 struct obd_uuid *uuid)
1203 return __class_new_export(obd, uuid, false);
1205 EXPORT_SYMBOL(class_new_export);
1207 struct obd_export *class_new_export_self(struct obd_device *obd,
1208 struct obd_uuid *uuid)
1210 return __class_new_export(obd, uuid, true);
1213 void class_unlink_export(struct obd_export *exp)
1215 class_handle_unhash(&exp->exp_handle);
1217 if (exp->exp_obd->obd_self_export == exp) {
1218 class_export_put(exp);
1222 spin_lock(&exp->exp_obd->obd_dev_lock);
1223 /* delete an uuid-export hashitem from hashtables */
1224 if (!hlist_unhashed(&exp->exp_uuid_hash))
1225 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1226 &exp->exp_client_uuid,
1227 &exp->exp_uuid_hash);
1229 #ifdef HAVE_SERVER_SUPPORT
1230 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1231 struct tg_export_data *ted = &exp->exp_target_data;
1232 struct cfs_hash *hash;
1234 /* Because obd_gen_hash will not be released until
1235 * class_cleanup(), so hash should never be NULL here */
1236 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1237 LASSERT(hash != NULL);
1238 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1239 &exp->exp_gen_hash);
1240 cfs_hash_putref(hash);
1242 #endif /* HAVE_SERVER_SUPPORT */
1244 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1245 list_del_init(&exp->exp_obd_chain_timed);
1246 exp->exp_obd->obd_num_exports--;
1247 spin_unlock(&exp->exp_obd->obd_dev_lock);
1248 atomic_inc(&obd_stale_export_num);
1250 /* A reference is kept by obd_stale_exports list */
1251 obd_stale_export_put(exp);
1253 EXPORT_SYMBOL(class_unlink_export);
1255 /* Import management functions */
1256 static void class_import_destroy(struct obd_import *imp)
1260 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1261 imp->imp_obd->obd_name);
1263 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1265 ptlrpc_put_connection_superhack(imp->imp_connection);
1267 while (!list_empty(&imp->imp_conn_list)) {
1268 struct obd_import_conn *imp_conn;
1270 imp_conn = list_entry(imp->imp_conn_list.next,
1271 struct obd_import_conn, oic_item);
1272 list_del_init(&imp_conn->oic_item);
1273 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1274 OBD_FREE(imp_conn, sizeof(*imp_conn));
1277 LASSERT(imp->imp_sec == NULL);
1278 class_decref(imp->imp_obd, "import", imp);
1279 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1283 static void import_handle_addref(void *import)
1285 class_import_get(import);
1288 static struct portals_handle_ops import_handle_ops = {
1289 .hop_addref = import_handle_addref,
1293 struct obd_import *class_import_get(struct obd_import *import)
1295 atomic_inc(&import->imp_refcount);
1296 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1297 atomic_read(&import->imp_refcount),
1298 import->imp_obd->obd_name);
1301 EXPORT_SYMBOL(class_import_get);
1303 void class_import_put(struct obd_import *imp)
1307 LASSERT(list_empty(&imp->imp_zombie_chain));
1308 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1310 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1311 atomic_read(&imp->imp_refcount) - 1,
1312 imp->imp_obd->obd_name);
1314 if (atomic_dec_and_test(&imp->imp_refcount)) {
1315 CDEBUG(D_INFO, "final put import %p\n", imp);
1316 obd_zombie_import_add(imp);
1319 /* catch possible import put race */
1320 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1323 EXPORT_SYMBOL(class_import_put);
1325 static void init_imp_at(struct imp_at *at) {
1327 at_init(&at->iat_net_latency, 0, 0);
1328 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1329 /* max service estimates are tracked on the server side, so
1330 don't use the AT history here, just use the last reported
1331 val. (But keep hist for proc histogram, worst_ever) */
1332 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1337 struct obd_import *class_new_import(struct obd_device *obd)
1339 struct obd_import *imp;
1340 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1342 OBD_ALLOC(imp, sizeof(*imp));
1346 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1347 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1348 INIT_LIST_HEAD(&imp->imp_replay_list);
1349 INIT_LIST_HEAD(&imp->imp_sending_list);
1350 INIT_LIST_HEAD(&imp->imp_delayed_list);
1351 INIT_LIST_HEAD(&imp->imp_committed_list);
1352 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1353 imp->imp_known_replied_xid = 0;
1354 imp->imp_replay_cursor = &imp->imp_committed_list;
1355 spin_lock_init(&imp->imp_lock);
1356 imp->imp_last_success_conn = 0;
1357 imp->imp_state = LUSTRE_IMP_NEW;
1358 imp->imp_obd = class_incref(obd, "import", imp);
1359 mutex_init(&imp->imp_sec_mutex);
1360 init_waitqueue_head(&imp->imp_recovery_waitq);
1362 if (curr_pid_ns->child_reaper)
1363 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1365 imp->imp_sec_refpid = 1;
1367 atomic_set(&imp->imp_refcount, 2);
1368 atomic_set(&imp->imp_unregistering, 0);
1369 atomic_set(&imp->imp_inflight, 0);
1370 atomic_set(&imp->imp_replay_inflight, 0);
1371 atomic_set(&imp->imp_inval_count, 0);
1372 INIT_LIST_HEAD(&imp->imp_conn_list);
1373 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1374 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1375 init_imp_at(&imp->imp_at);
1377 /* the default magic is V2, will be used in connect RPC, and
1378 * then adjusted according to the flags in request/reply. */
1379 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1383 EXPORT_SYMBOL(class_new_import);
1385 void class_destroy_import(struct obd_import *import)
1387 LASSERT(import != NULL);
1388 LASSERT(import != LP_POISON);
1390 class_handle_unhash(&import->imp_handle);
1392 spin_lock(&import->imp_lock);
1393 import->imp_generation++;
1394 spin_unlock(&import->imp_lock);
1395 class_import_put(import);
1397 EXPORT_SYMBOL(class_destroy_import);
1399 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1401 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1403 spin_lock(&exp->exp_locks_list_guard);
1405 LASSERT(lock->l_exp_refs_nr >= 0);
1407 if (lock->l_exp_refs_target != NULL &&
1408 lock->l_exp_refs_target != exp) {
1409 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1410 exp, lock, lock->l_exp_refs_target);
1412 if ((lock->l_exp_refs_nr ++) == 0) {
1413 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1414 lock->l_exp_refs_target = exp;
1416 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1417 lock, exp, lock->l_exp_refs_nr);
1418 spin_unlock(&exp->exp_locks_list_guard);
1420 EXPORT_SYMBOL(__class_export_add_lock_ref);
1422 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1424 spin_lock(&exp->exp_locks_list_guard);
1425 LASSERT(lock->l_exp_refs_nr > 0);
1426 if (lock->l_exp_refs_target != exp) {
1427 LCONSOLE_WARN("lock %p, "
1428 "mismatching export pointers: %p, %p\n",
1429 lock, lock->l_exp_refs_target, exp);
1431 if (-- lock->l_exp_refs_nr == 0) {
1432 list_del_init(&lock->l_exp_refs_link);
1433 lock->l_exp_refs_target = NULL;
1435 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1436 lock, exp, lock->l_exp_refs_nr);
1437 spin_unlock(&exp->exp_locks_list_guard);
1439 EXPORT_SYMBOL(__class_export_del_lock_ref);
1442 /* A connection defines an export context in which preallocation can
1443 be managed. This releases the export pointer reference, and returns
1444 the export handle, so the export refcount is 1 when this function
1446 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1447 struct obd_uuid *cluuid)
1449 struct obd_export *export;
1450 LASSERT(conn != NULL);
1451 LASSERT(obd != NULL);
1452 LASSERT(cluuid != NULL);
1455 export = class_new_export(obd, cluuid);
1457 RETURN(PTR_ERR(export));
1459 conn->cookie = export->exp_handle.h_cookie;
1460 class_export_put(export);
1462 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1463 cluuid->uuid, conn->cookie);
1466 EXPORT_SYMBOL(class_connect);
1468 /* if export is involved in recovery then clean up related things */
1469 static void class_export_recovery_cleanup(struct obd_export *exp)
1471 struct obd_device *obd = exp->exp_obd;
1473 spin_lock(&obd->obd_recovery_task_lock);
1474 if (obd->obd_recovering) {
1475 if (exp->exp_in_recovery) {
1476 spin_lock(&exp->exp_lock);
1477 exp->exp_in_recovery = 0;
1478 spin_unlock(&exp->exp_lock);
1479 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1480 atomic_dec(&obd->obd_connected_clients);
1483 /* if called during recovery then should update
1484 * obd_stale_clients counter,
1485 * lightweight exports are not counted */
1486 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1487 exp->exp_obd->obd_stale_clients++;
1489 spin_unlock(&obd->obd_recovery_task_lock);
1491 spin_lock(&exp->exp_lock);
1492 /** Cleanup req replay fields */
1493 if (exp->exp_req_replay_needed) {
1494 exp->exp_req_replay_needed = 0;
1496 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1497 atomic_dec(&obd->obd_req_replay_clients);
1500 /** Cleanup lock replay data */
1501 if (exp->exp_lock_replay_needed) {
1502 exp->exp_lock_replay_needed = 0;
1504 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1505 atomic_dec(&obd->obd_lock_replay_clients);
1507 spin_unlock(&exp->exp_lock);
1510 /* This function removes 1-3 references from the export:
1511 * 1 - for export pointer passed
1512 * and if disconnect really need
1513 * 2 - removing from hash
1514 * 3 - in client_unlink_export
1515 * The export pointer passed to this function can destroyed */
1516 int class_disconnect(struct obd_export *export)
1518 int already_disconnected;
1521 if (export == NULL) {
1522 CWARN("attempting to free NULL export %p\n", export);
1526 spin_lock(&export->exp_lock);
1527 already_disconnected = export->exp_disconnected;
1528 export->exp_disconnected = 1;
1529 /* We hold references of export for uuid hash
1530 * and nid_hash and export link at least. So
1531 * it is safe to call cfs_hash_del in there. */
1532 if (!hlist_unhashed(&export->exp_nid_hash))
1533 cfs_hash_del(export->exp_obd->obd_nid_hash,
1534 &export->exp_connection->c_peer.nid,
1535 &export->exp_nid_hash);
1536 spin_unlock(&export->exp_lock);
1538 /* class_cleanup(), abort_recovery(), and class_fail_export()
1539 * all end up in here, and if any of them race we shouldn't
1540 * call extra class_export_puts(). */
1541 if (already_disconnected) {
1542 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1543 GOTO(no_disconn, already_disconnected);
1546 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1547 export->exp_handle.h_cookie);
1549 class_export_recovery_cleanup(export);
1550 class_unlink_export(export);
1552 class_export_put(export);
1555 EXPORT_SYMBOL(class_disconnect);
1557 /* Return non-zero for a fully connected export */
1558 int class_connected_export(struct obd_export *exp)
1563 spin_lock(&exp->exp_lock);
1564 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1565 spin_unlock(&exp->exp_lock);
1569 EXPORT_SYMBOL(class_connected_export);
1571 static void class_disconnect_export_list(struct list_head *list,
1572 enum obd_option flags)
1575 struct obd_export *exp;
1578 /* It's possible that an export may disconnect itself, but
1579 * nothing else will be added to this list. */
1580 while (!list_empty(list)) {
1581 exp = list_entry(list->next, struct obd_export,
1583 /* need for safe call CDEBUG after obd_disconnect */
1584 class_export_get(exp);
1586 spin_lock(&exp->exp_lock);
1587 exp->exp_flags = flags;
1588 spin_unlock(&exp->exp_lock);
1590 if (obd_uuid_equals(&exp->exp_client_uuid,
1591 &exp->exp_obd->obd_uuid)) {
1593 "exp %p export uuid == obd uuid, don't discon\n",
1595 /* Need to delete this now so we don't end up pointing
1596 * to work_list later when this export is cleaned up. */
1597 list_del_init(&exp->exp_obd_chain);
1598 class_export_put(exp);
1602 class_export_get(exp);
1603 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1604 "last request at %lld\n",
1605 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1606 exp, exp->exp_last_request_time);
1607 /* release one export reference anyway */
1608 rc = obd_disconnect(exp);
1610 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1611 obd_export_nid2str(exp), exp, rc);
1612 class_export_put(exp);
1617 void class_disconnect_exports(struct obd_device *obd)
1619 struct list_head work_list;
1622 /* Move all of the exports from obd_exports to a work list, en masse. */
1623 INIT_LIST_HEAD(&work_list);
1624 spin_lock(&obd->obd_dev_lock);
1625 list_splice_init(&obd->obd_exports, &work_list);
1626 list_splice_init(&obd->obd_delayed_exports, &work_list);
1627 spin_unlock(&obd->obd_dev_lock);
1629 if (!list_empty(&work_list)) {
1630 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1631 "disconnecting them\n", obd->obd_minor, obd);
1632 class_disconnect_export_list(&work_list,
1633 exp_flags_from_obd(obd));
1635 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1636 obd->obd_minor, obd);
1639 EXPORT_SYMBOL(class_disconnect_exports);
1641 /* Remove exports that have not completed recovery.
1643 void class_disconnect_stale_exports(struct obd_device *obd,
1644 int (*test_export)(struct obd_export *))
1646 struct list_head work_list;
1647 struct obd_export *exp, *n;
1651 INIT_LIST_HEAD(&work_list);
1652 spin_lock(&obd->obd_dev_lock);
1653 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1655 /* don't count self-export as client */
1656 if (obd_uuid_equals(&exp->exp_client_uuid,
1657 &exp->exp_obd->obd_uuid))
1660 /* don't evict clients which have no slot in last_rcvd
1661 * (e.g. lightweight connection) */
1662 if (exp->exp_target_data.ted_lr_idx == -1)
1665 spin_lock(&exp->exp_lock);
1666 if (exp->exp_failed || test_export(exp)) {
1667 spin_unlock(&exp->exp_lock);
1670 exp->exp_failed = 1;
1671 spin_unlock(&exp->exp_lock);
1673 list_move(&exp->exp_obd_chain, &work_list);
1675 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1676 obd->obd_name, exp->exp_client_uuid.uuid,
1677 exp->exp_connection == NULL ? "<unknown>" :
1678 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1679 print_export_data(exp, "EVICTING", 0, D_HA);
1681 spin_unlock(&obd->obd_dev_lock);
1684 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1685 obd->obd_name, evicted);
1687 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1688 OBD_OPT_ABORT_RECOV);
1691 EXPORT_SYMBOL(class_disconnect_stale_exports);
1693 void class_fail_export(struct obd_export *exp)
1695 int rc, already_failed;
1697 spin_lock(&exp->exp_lock);
1698 already_failed = exp->exp_failed;
1699 exp->exp_failed = 1;
1700 spin_unlock(&exp->exp_lock);
1702 if (already_failed) {
1703 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1704 exp, exp->exp_client_uuid.uuid);
1708 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1709 exp, exp->exp_client_uuid.uuid);
1711 if (obd_dump_on_timeout)
1712 libcfs_debug_dumplog();
1714 /* need for safe call CDEBUG after obd_disconnect */
1715 class_export_get(exp);
1717 /* Most callers into obd_disconnect are removing their own reference
1718 * (request, for example) in addition to the one from the hash table.
1719 * We don't have such a reference here, so make one. */
1720 class_export_get(exp);
1721 rc = obd_disconnect(exp);
1723 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1725 CDEBUG(D_HA, "disconnected export %p/%s\n",
1726 exp, exp->exp_client_uuid.uuid);
1727 class_export_put(exp);
1729 EXPORT_SYMBOL(class_fail_export);
1731 char *obd_export_nid2str(struct obd_export *exp)
1733 if (exp->exp_connection != NULL)
1734 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1738 EXPORT_SYMBOL(obd_export_nid2str);
1740 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1742 struct cfs_hash *nid_hash;
1743 struct obd_export *doomed_exp = NULL;
1744 int exports_evicted = 0;
1746 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1748 spin_lock(&obd->obd_dev_lock);
1749 /* umount has run already, so evict thread should leave
1750 * its task to umount thread now */
1751 if (obd->obd_stopping) {
1752 spin_unlock(&obd->obd_dev_lock);
1753 return exports_evicted;
1755 nid_hash = obd->obd_nid_hash;
1756 cfs_hash_getref(nid_hash);
1757 spin_unlock(&obd->obd_dev_lock);
1760 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1761 if (doomed_exp == NULL)
1764 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1765 "nid %s found, wanted nid %s, requested nid %s\n",
1766 obd_export_nid2str(doomed_exp),
1767 libcfs_nid2str(nid_key), nid);
1768 LASSERTF(doomed_exp != obd->obd_self_export,
1769 "self-export is hashed by NID?\n");
1771 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1772 "request\n", obd->obd_name,
1773 obd_uuid2str(&doomed_exp->exp_client_uuid),
1774 obd_export_nid2str(doomed_exp));
1775 class_fail_export(doomed_exp);
1776 class_export_put(doomed_exp);
1779 cfs_hash_putref(nid_hash);
1781 if (!exports_evicted)
1782 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1783 obd->obd_name, nid);
1784 return exports_evicted;
1786 EXPORT_SYMBOL(obd_export_evict_by_nid);
1788 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1790 struct cfs_hash *uuid_hash;
1791 struct obd_export *doomed_exp = NULL;
1792 struct obd_uuid doomed_uuid;
1793 int exports_evicted = 0;
1795 spin_lock(&obd->obd_dev_lock);
1796 if (obd->obd_stopping) {
1797 spin_unlock(&obd->obd_dev_lock);
1798 return exports_evicted;
1800 uuid_hash = obd->obd_uuid_hash;
1801 cfs_hash_getref(uuid_hash);
1802 spin_unlock(&obd->obd_dev_lock);
1804 obd_str2uuid(&doomed_uuid, uuid);
1805 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1806 CERROR("%s: can't evict myself\n", obd->obd_name);
1807 cfs_hash_putref(uuid_hash);
1808 return exports_evicted;
1811 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1813 if (doomed_exp == NULL) {
1814 CERROR("%s: can't disconnect %s: no exports found\n",
1815 obd->obd_name, uuid);
1817 CWARN("%s: evicting %s at adminstrative request\n",
1818 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1819 class_fail_export(doomed_exp);
1820 class_export_put(doomed_exp);
1823 cfs_hash_putref(uuid_hash);
1825 return exports_evicted;
1828 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1829 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1830 EXPORT_SYMBOL(class_export_dump_hook);
1833 static void print_export_data(struct obd_export *exp, const char *status,
1834 int locks, int debug_level)
1836 struct ptlrpc_reply_state *rs;
1837 struct ptlrpc_reply_state *first_reply = NULL;
1840 spin_lock(&exp->exp_lock);
1841 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1847 spin_unlock(&exp->exp_lock);
1849 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1850 "%p %s %llu stale:%d\n",
1851 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1852 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1853 atomic_read(&exp->exp_rpc_count),
1854 atomic_read(&exp->exp_cb_count),
1855 atomic_read(&exp->exp_locks_count),
1856 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1857 nreplies, first_reply, nreplies > 3 ? "..." : "",
1858 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1859 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1860 if (locks && class_export_dump_hook != NULL)
1861 class_export_dump_hook(exp);
1865 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1867 struct obd_export *exp;
1869 spin_lock(&obd->obd_dev_lock);
1870 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1871 print_export_data(exp, "ACTIVE", locks, debug_level);
1872 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1873 print_export_data(exp, "UNLINKED", locks, debug_level);
1874 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1875 print_export_data(exp, "DELAYED", locks, debug_level);
1876 spin_unlock(&obd->obd_dev_lock);
1877 spin_lock(&obd_zombie_impexp_lock);
1878 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1879 print_export_data(exp, "ZOMBIE", locks, debug_level);
1880 spin_unlock(&obd_zombie_impexp_lock);
1883 void obd_exports_barrier(struct obd_device *obd)
1886 LASSERT(list_empty(&obd->obd_exports));
1887 spin_lock(&obd->obd_dev_lock);
1888 while (!list_empty(&obd->obd_unlinked_exports)) {
1889 spin_unlock(&obd->obd_dev_lock);
1890 set_current_state(TASK_UNINTERRUPTIBLE);
1891 schedule_timeout(cfs_time_seconds(waited));
1892 if (waited > 5 && is_power_of_2(waited)) {
1893 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1894 "more than %d seconds. "
1895 "The obd refcount = %d. Is it stuck?\n",
1896 obd->obd_name, waited,
1897 atomic_read(&obd->obd_refcount));
1898 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1901 spin_lock(&obd->obd_dev_lock);
1903 spin_unlock(&obd->obd_dev_lock);
1905 EXPORT_SYMBOL(obd_exports_barrier);
1907 /* Total amount of zombies to be destroyed */
1908 static int zombies_count = 0;
1911 * kill zombie imports and exports
1913 void obd_zombie_impexp_cull(void)
1915 struct obd_import *import;
1916 struct obd_export *export;
1920 spin_lock(&obd_zombie_impexp_lock);
1923 if (!list_empty(&obd_zombie_imports)) {
1924 import = list_entry(obd_zombie_imports.next,
1927 list_del_init(&import->imp_zombie_chain);
1931 if (!list_empty(&obd_zombie_exports)) {
1932 export = list_entry(obd_zombie_exports.next,
1935 list_del_init(&export->exp_obd_chain);
1938 spin_unlock(&obd_zombie_impexp_lock);
1940 if (import != NULL) {
1941 class_import_destroy(import);
1942 spin_lock(&obd_zombie_impexp_lock);
1944 spin_unlock(&obd_zombie_impexp_lock);
1947 if (export != NULL) {
1948 class_export_destroy(export);
1949 spin_lock(&obd_zombie_impexp_lock);
1951 spin_unlock(&obd_zombie_impexp_lock);
1955 } while (import != NULL || export != NULL);
1959 static DECLARE_COMPLETION(obd_zombie_start);
1960 static DECLARE_COMPLETION(obd_zombie_stop);
1961 static unsigned long obd_zombie_flags;
1962 static DECLARE_WAIT_QUEUE_HEAD(obd_zombie_waitq);
1963 static pid_t obd_zombie_pid;
1966 OBD_ZOMBIE_STOP = 0x0001,
1970 * check for work for kill zombie import/export thread.
1972 static int obd_zombie_impexp_check(void *arg)
1976 spin_lock(&obd_zombie_impexp_lock);
1977 rc = (zombies_count == 0) &&
1978 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1979 spin_unlock(&obd_zombie_impexp_lock);
1985 * Add export to the obd_zombe thread and notify it.
1987 static void obd_zombie_export_add(struct obd_export *exp) {
1988 atomic_dec(&obd_stale_export_num);
1989 spin_lock(&exp->exp_obd->obd_dev_lock);
1990 LASSERT(!list_empty(&exp->exp_obd_chain));
1991 list_del_init(&exp->exp_obd_chain);
1992 spin_unlock(&exp->exp_obd->obd_dev_lock);
1993 spin_lock(&obd_zombie_impexp_lock);
1995 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1996 spin_unlock(&obd_zombie_impexp_lock);
1998 obd_zombie_impexp_notify();
2002 * Add import to the obd_zombe thread and notify it.
2004 static void obd_zombie_import_add(struct obd_import *imp) {
2005 LASSERT(imp->imp_sec == NULL);
2006 spin_lock(&obd_zombie_impexp_lock);
2007 LASSERT(list_empty(&imp->imp_zombie_chain));
2009 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
2010 spin_unlock(&obd_zombie_impexp_lock);
2012 obd_zombie_impexp_notify();
2016 * notify import/export destroy thread about new zombie.
2018 static void obd_zombie_impexp_notify(void)
2021 * Make sure obd_zomebie_impexp_thread get this notification.
2022 * It is possible this signal only get by obd_zombie_barrier, and
2023 * barrier gulps this notification and sleeps away and hangs ensues
2025 wake_up_all(&obd_zombie_waitq);
2029 * check whether obd_zombie is idle
2031 static int obd_zombie_is_idle(void)
2035 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
2036 spin_lock(&obd_zombie_impexp_lock);
2037 rc = (zombies_count == 0);
2038 spin_unlock(&obd_zombie_impexp_lock);
2043 * wait when obd_zombie import/export queues become empty
2045 void obd_zombie_barrier(void)
2047 struct l_wait_info lwi = { 0 };
2049 if (obd_zombie_pid == current_pid())
2050 /* don't wait for myself */
2052 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
2054 EXPORT_SYMBOL(obd_zombie_barrier);
2057 struct obd_export *obd_stale_export_get(void)
2059 struct obd_export *exp = NULL;
2062 spin_lock(&obd_stale_export_lock);
2063 if (!list_empty(&obd_stale_exports)) {
2064 exp = list_entry(obd_stale_exports.next,
2065 struct obd_export, exp_stale_list);
2066 list_del_init(&exp->exp_stale_list);
2068 spin_unlock(&obd_stale_export_lock);
2071 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
2072 atomic_read(&obd_stale_export_num));
2076 EXPORT_SYMBOL(obd_stale_export_get);
2078 void obd_stale_export_put(struct obd_export *exp)
2082 LASSERT(list_empty(&exp->exp_stale_list));
2083 if (exp->exp_lock_hash &&
2084 atomic_read(&exp->exp_lock_hash->hs_count)) {
2085 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
2086 atomic_read(&obd_stale_export_num));
2088 spin_lock_bh(&exp->exp_bl_list_lock);
2089 spin_lock(&obd_stale_export_lock);
2090 /* Add to the tail if there is no blocked locks,
2091 * to the head otherwise. */
2092 if (list_empty(&exp->exp_bl_list))
2093 list_add_tail(&exp->exp_stale_list,
2094 &obd_stale_exports);
2096 list_add(&exp->exp_stale_list,
2097 &obd_stale_exports);
2099 spin_unlock(&obd_stale_export_lock);
2100 spin_unlock_bh(&exp->exp_bl_list_lock);
2102 class_export_put(exp);
2106 EXPORT_SYMBOL(obd_stale_export_put);
2109 * Adjust the position of the export in the stale list,
2110 * i.e. move to the head of the list if is needed.
2112 void obd_stale_export_adjust(struct obd_export *exp)
2114 LASSERT(exp != NULL);
2115 spin_lock_bh(&exp->exp_bl_list_lock);
2116 spin_lock(&obd_stale_export_lock);
2118 if (!list_empty(&exp->exp_stale_list) &&
2119 !list_empty(&exp->exp_bl_list))
2120 list_move(&exp->exp_stale_list, &obd_stale_exports);
2122 spin_unlock(&obd_stale_export_lock);
2123 spin_unlock_bh(&exp->exp_bl_list_lock);
2125 EXPORT_SYMBOL(obd_stale_export_adjust);
2128 * destroy zombie export/import thread.
2130 static int obd_zombie_impexp_thread(void *unused)
2132 unshare_fs_struct();
2133 complete(&obd_zombie_start);
2135 obd_zombie_pid = current_pid();
2137 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
2138 struct l_wait_info lwi = { 0 };
2140 l_wait_event(obd_zombie_waitq,
2141 !obd_zombie_impexp_check(NULL), &lwi);
2142 obd_zombie_impexp_cull();
2145 * Notify obd_zombie_barrier callers that queues
2148 wake_up(&obd_zombie_waitq);
2151 complete(&obd_zombie_stop);
2158 * start destroy zombie import/export thread
2160 int obd_zombie_impexp_init(void)
2162 struct task_struct *task;
2164 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
2166 RETURN(PTR_ERR(task));
2168 wait_for_completion(&obd_zombie_start);
2172 * stop destroy zombie import/export thread
2174 void obd_zombie_impexp_stop(void)
2176 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
2177 obd_zombie_impexp_notify();
2178 wait_for_completion(&obd_zombie_stop);
2179 LASSERT(list_empty(&obd_stale_exports));
2182 /***** Kernel-userspace comm helpers *******/
2184 /* Get length of entire message, including header */
2185 int kuc_len(int payload_len)
2187 return sizeof(struct kuc_hdr) + payload_len;
2189 EXPORT_SYMBOL(kuc_len);
2191 /* Get a pointer to kuc header, given a ptr to the payload
2192 * @param p Pointer to payload area
2193 * @returns Pointer to kuc header
2195 struct kuc_hdr * kuc_ptr(void *p)
2197 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2198 LASSERT(lh->kuc_magic == KUC_MAGIC);
2201 EXPORT_SYMBOL(kuc_ptr);
2203 /* Alloc space for a message, and fill in header
2204 * @return Pointer to payload area
2206 void *kuc_alloc(int payload_len, int transport, int type)
2209 int len = kuc_len(payload_len);
2213 return ERR_PTR(-ENOMEM);
2215 lh->kuc_magic = KUC_MAGIC;
2216 lh->kuc_transport = transport;
2217 lh->kuc_msgtype = type;
2218 lh->kuc_msglen = len;
2220 return (void *)(lh + 1);
2222 EXPORT_SYMBOL(kuc_alloc);
2224 /* Takes pointer to payload area */
2225 void kuc_free(void *p, int payload_len)
2227 struct kuc_hdr *lh = kuc_ptr(p);
2228 OBD_FREE(lh, kuc_len(payload_len));
2230 EXPORT_SYMBOL(kuc_free);
2232 struct obd_request_slot_waiter {
2233 struct list_head orsw_entry;
2234 wait_queue_head_t orsw_waitq;
2238 static bool obd_request_slot_avail(struct client_obd *cli,
2239 struct obd_request_slot_waiter *orsw)
2243 spin_lock(&cli->cl_loi_list_lock);
2244 avail = !!list_empty(&orsw->orsw_entry);
2245 spin_unlock(&cli->cl_loi_list_lock);
2251 * For network flow control, the RPC sponsor needs to acquire a credit
2252 * before sending the RPC. The credits count for a connection is defined
2253 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2254 * the subsequent RPC sponsors need to wait until others released their
2255 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2257 int obd_get_request_slot(struct client_obd *cli)
2259 struct obd_request_slot_waiter orsw;
2260 struct l_wait_info lwi;
2263 spin_lock(&cli->cl_loi_list_lock);
2264 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2265 cli->cl_rpcs_in_flight++;
2266 spin_unlock(&cli->cl_loi_list_lock);
2270 init_waitqueue_head(&orsw.orsw_waitq);
2271 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2272 orsw.orsw_signaled = false;
2273 spin_unlock(&cli->cl_loi_list_lock);
2275 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2276 rc = l_wait_event(orsw.orsw_waitq,
2277 obd_request_slot_avail(cli, &orsw) ||
2281 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2282 * freed but other (such as obd_put_request_slot) is using it. */
2283 spin_lock(&cli->cl_loi_list_lock);
2285 if (!orsw.orsw_signaled) {
2286 if (list_empty(&orsw.orsw_entry))
2287 cli->cl_rpcs_in_flight--;
2289 list_del(&orsw.orsw_entry);
2293 if (orsw.orsw_signaled) {
2294 LASSERT(list_empty(&orsw.orsw_entry));
2298 spin_unlock(&cli->cl_loi_list_lock);
2302 EXPORT_SYMBOL(obd_get_request_slot);
2304 void obd_put_request_slot(struct client_obd *cli)
2306 struct obd_request_slot_waiter *orsw;
2308 spin_lock(&cli->cl_loi_list_lock);
2309 cli->cl_rpcs_in_flight--;
2311 /* If there is free slot, wakeup the first waiter. */
2312 if (!list_empty(&cli->cl_flight_waiters) &&
2313 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2314 orsw = list_entry(cli->cl_flight_waiters.next,
2315 struct obd_request_slot_waiter, orsw_entry);
2316 list_del_init(&orsw->orsw_entry);
2317 cli->cl_rpcs_in_flight++;
2318 wake_up(&orsw->orsw_waitq);
2320 spin_unlock(&cli->cl_loi_list_lock);
2322 EXPORT_SYMBOL(obd_put_request_slot);
2324 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2326 return cli->cl_max_rpcs_in_flight;
2328 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2330 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2332 struct obd_request_slot_waiter *orsw;
2339 if (max > OBD_MAX_RIF_MAX || max < 1)
2342 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2343 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2344 /* adjust max_mod_rpcs_in_flight to ensure it is always
2345 * strictly lower that max_rpcs_in_flight */
2347 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2348 "because it must be higher than "
2349 "max_mod_rpcs_in_flight value",
2350 cli->cl_import->imp_obd->obd_name);
2353 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2354 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2360 spin_lock(&cli->cl_loi_list_lock);
2361 old = cli->cl_max_rpcs_in_flight;
2362 cli->cl_max_rpcs_in_flight = max;
2363 client_adjust_max_dirty(cli);
2367 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2368 for (i = 0; i < diff; i++) {
2369 if (list_empty(&cli->cl_flight_waiters))
2372 orsw = list_entry(cli->cl_flight_waiters.next,
2373 struct obd_request_slot_waiter, orsw_entry);
2374 list_del_init(&orsw->orsw_entry);
2375 cli->cl_rpcs_in_flight++;
2376 wake_up(&orsw->orsw_waitq);
2378 spin_unlock(&cli->cl_loi_list_lock);
2382 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2384 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2386 return cli->cl_max_mod_rpcs_in_flight;
2388 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2390 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2392 struct obd_connect_data *ocd;
2396 if (max > OBD_MAX_RIF_MAX || max < 1)
2399 /* cannot exceed or equal max_rpcs_in_flight */
2400 if (max >= cli->cl_max_rpcs_in_flight) {
2401 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2402 "higher or equal to max_rpcs_in_flight value (%u)\n",
2403 cli->cl_import->imp_obd->obd_name,
2404 max, cli->cl_max_rpcs_in_flight);
2408 /* cannot exceed max modify RPCs in flight supported by the server */
2409 ocd = &cli->cl_import->imp_connect_data;
2410 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2411 maxmodrpcs = ocd->ocd_maxmodrpcs;
2414 if (max > maxmodrpcs) {
2415 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2416 "higher than max_mod_rpcs_per_client value (%hu) "
2417 "returned by the server at connection\n",
2418 cli->cl_import->imp_obd->obd_name,
2423 spin_lock(&cli->cl_mod_rpcs_lock);
2425 prev = cli->cl_max_mod_rpcs_in_flight;
2426 cli->cl_max_mod_rpcs_in_flight = max;
2428 /* wakeup waiters if limit has been increased */
2429 if (cli->cl_max_mod_rpcs_in_flight > prev)
2430 wake_up(&cli->cl_mod_rpcs_waitq);
2432 spin_unlock(&cli->cl_mod_rpcs_lock);
2436 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2439 #define pct(a, b) (b ? a * 100 / b : 0)
2440 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2441 struct seq_file *seq)
2443 unsigned long mod_tot = 0, mod_cum;
2444 struct timespec64 now;
2447 ktime_get_real_ts64(&now);
2449 spin_lock(&cli->cl_mod_rpcs_lock);
2451 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2452 (s64)now.tv_sec, now.tv_nsec);
2453 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2454 cli->cl_mod_rpcs_in_flight);
2456 seq_printf(seq, "\n\t\t\tmodify\n");
2457 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2459 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2462 for (i = 0; i < OBD_HIST_MAX; i++) {
2463 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2465 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2466 i, mod, pct(mod, mod_tot),
2467 pct(mod_cum, mod_tot));
2468 if (mod_cum == mod_tot)
2472 spin_unlock(&cli->cl_mod_rpcs_lock);
2476 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2480 /* The number of modify RPCs sent in parallel is limited
2481 * because the server has a finite number of slots per client to
2482 * store request result and ensure reply reconstruction when needed.
2483 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2484 * that takes into account server limit and cl_max_rpcs_in_flight
2486 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2487 * one close request is allowed above the maximum.
2489 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2494 /* A slot is available if
2495 * - number of modify RPCs in flight is less than the max
2496 * - it's a close RPC and no other close request is in flight
2498 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2499 (close_req && cli->cl_close_rpcs_in_flight == 0);
2504 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2509 spin_lock(&cli->cl_mod_rpcs_lock);
2510 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2511 spin_unlock(&cli->cl_mod_rpcs_lock);
2515 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2518 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2519 it->it_op == IT_READDIR ||
2520 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2525 /* Get a modify RPC slot from the obd client @cli according
2526 * to the kind of operation @opc that is going to be sent
2527 * and the intent @it of the operation if it applies.
2528 * If the maximum number of modify RPCs in flight is reached
2529 * the thread is put to sleep.
2530 * Returns the tag to be set in the request message. Tag 0
2531 * is reserved for non-modifying requests.
2533 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2534 struct lookup_intent *it)
2536 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2537 bool close_req = false;
2540 /* read-only metadata RPCs don't consume a slot on MDT
2541 * for reply reconstruction
2543 if (obd_skip_mod_rpc_slot(it))
2546 if (opc == MDS_CLOSE)
2550 spin_lock(&cli->cl_mod_rpcs_lock);
2551 max = cli->cl_max_mod_rpcs_in_flight;
2552 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2553 /* there is a slot available */
2554 cli->cl_mod_rpcs_in_flight++;
2556 cli->cl_close_rpcs_in_flight++;
2557 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2558 cli->cl_mod_rpcs_in_flight);
2559 /* find a free tag */
2560 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2562 LASSERT(i < OBD_MAX_RIF_MAX);
2563 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2564 spin_unlock(&cli->cl_mod_rpcs_lock);
2565 /* tag 0 is reserved for non-modify RPCs */
2568 spin_unlock(&cli->cl_mod_rpcs_lock);
2570 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2571 "opc %u, max %hu\n",
2572 cli->cl_import->imp_obd->obd_name, opc, max);
2574 l_wait_event(cli->cl_mod_rpcs_waitq,
2575 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2578 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2580 /* Put a modify RPC slot from the obd client @cli according
2581 * to the kind of operation @opc that has been sent and the
2582 * intent @it of the operation if it applies.
2584 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2585 struct lookup_intent *it, __u16 tag)
2587 bool close_req = false;
2589 if (obd_skip_mod_rpc_slot(it))
2592 if (opc == MDS_CLOSE)
2595 spin_lock(&cli->cl_mod_rpcs_lock);
2596 cli->cl_mod_rpcs_in_flight--;
2598 cli->cl_close_rpcs_in_flight--;
2599 /* release the tag in the bitmap */
2600 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2601 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2602 spin_unlock(&cli->cl_mod_rpcs_lock);
2603 wake_up(&cli->cl_mod_rpcs_waitq);
2605 EXPORT_SYMBOL(obd_put_mod_rpc_slot);