4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
54 static struct kmem_cache *obd_device_cachep;
56 static struct workqueue_struct *zombie_wq;
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61 const char *status, int locks, int debug_level);
63 static LIST_HEAD(obd_stale_exports);
64 static DEFINE_SPINLOCK(obd_stale_export_lock);
65 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
101 struct list_head *tmp;
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 list_for_each(tmp, &obd_types) {
106 type = list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
152 void class_put_type(struct obd_type *type)
155 spin_lock(&type->obd_type_lock);
157 module_put(type->typ_dt_ops->o_owner);
158 spin_unlock(&type->obd_type_lock);
161 static void class_sysfs_release(struct kobject *kobj)
163 OBD_FREE(kobj, sizeof(*kobj));
166 static struct kobj_type class_ktype = {
167 .sysfs_ops = &lustre_sysfs_ops,
168 .release = class_sysfs_release,
171 struct kobject *class_setup_tunables(const char *name)
173 struct kobject *kobj;
176 #ifdef HAVE_SERVER_SUPPORT
177 kobj = kset_find_obj(lustre_kset, name);
181 OBD_ALLOC(kobj, sizeof(*kobj));
183 return ERR_PTR(-ENOMEM);
185 kobj->kset = lustre_kset;
186 kobject_init(kobj, &class_ktype);
187 rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
194 EXPORT_SYMBOL(class_setup_tunables);
196 #define CLASS_MAX_NAME 1024
198 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
199 bool enable_proc, struct lprocfs_vars *vars,
200 const char *name, struct lu_device_type *ldt)
202 struct obd_type *type;
203 #ifdef HAVE_SERVER_SUPPORT
205 #endif /* HAVE_SERVER_SUPPORT */
210 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
212 if (class_search_type(name)) {
213 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
218 OBD_ALLOC(type, sizeof(*type));
222 OBD_ALLOC_PTR(type->typ_dt_ops);
223 OBD_ALLOC_PTR(type->typ_md_ops);
224 OBD_ALLOC(type->typ_name, strlen(name) + 1);
226 if (type->typ_dt_ops == NULL ||
227 type->typ_md_ops == NULL ||
228 type->typ_name == NULL)
231 *(type->typ_dt_ops) = *dt_ops;
232 /* md_ops is optional */
234 *(type->typ_md_ops) = *md_ops;
235 strcpy(type->typ_name, name);
236 spin_lock_init(&type->obd_type_lock);
238 #ifdef CONFIG_PROC_FS
240 type->typ_procroot = lprocfs_register(type->typ_name,
243 if (IS_ERR(type->typ_procroot)) {
244 rc = PTR_ERR(type->typ_procroot);
245 type->typ_procroot = NULL;
250 #ifdef HAVE_SERVER_SUPPORT
252 dname.len = strlen(dname.name);
253 dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
255 type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
256 if (type->typ_debugfs_entry) {
257 dput(type->typ_debugfs_entry);
258 type->typ_sym_filter = true;
261 #endif /* HAVE_SERVER_SUPPORT */
263 type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
266 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
267 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
269 type->typ_debugfs_entry = NULL;
272 #ifdef HAVE_SERVER_SUPPORT
275 type->typ_kobj = class_setup_tunables(type->typ_name);
276 if (IS_ERR(type->typ_kobj))
277 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
281 rc = lu_device_type_init(ldt);
283 kobject_put(type->typ_kobj);
288 spin_lock(&obd_types_lock);
289 list_add(&type->typ_chain, &obd_types);
290 spin_unlock(&obd_types_lock);
295 #ifdef HAVE_SERVER_SUPPORT
296 if (type->typ_sym_filter)
297 type->typ_debugfs_entry = NULL;
299 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
300 ldebugfs_remove(&type->typ_debugfs_entry);
301 if (type->typ_name != NULL) {
302 #ifdef CONFIG_PROC_FS
303 if (type->typ_procroot != NULL)
304 remove_proc_subtree(type->typ_name, proc_lustre_root);
306 OBD_FREE(type->typ_name, strlen(name) + 1);
308 if (type->typ_md_ops != NULL)
309 OBD_FREE_PTR(type->typ_md_ops);
310 if (type->typ_dt_ops != NULL)
311 OBD_FREE_PTR(type->typ_dt_ops);
312 OBD_FREE(type, sizeof(*type));
315 EXPORT_SYMBOL(class_register_type);
317 int class_unregister_type(const char *name)
319 struct obd_type *type = class_search_type(name);
323 CERROR("unknown obd type\n");
327 if (type->typ_refcnt) {
328 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
329 /* This is a bad situation, let's make the best of it */
330 /* Remove ops, but leave the name for debugging */
331 OBD_FREE_PTR(type->typ_dt_ops);
332 OBD_FREE_PTR(type->typ_md_ops);
336 kobject_put(type->typ_kobj);
338 /* we do not use type->typ_procroot as for compatibility purposes
339 * other modules can share names (i.e. lod can use lov entry). so
340 * we can't reference pointer as it can get invalided when another
341 * module removes the entry */
342 #ifdef CONFIG_PROC_FS
343 if (type->typ_procroot != NULL)
344 remove_proc_subtree(type->typ_name, proc_lustre_root);
345 if (type->typ_procsym != NULL)
346 lprocfs_remove(&type->typ_procsym);
348 #ifdef HAVE_SERVER_SUPPORT
349 if (type->typ_sym_filter)
350 type->typ_debugfs_entry = NULL;
352 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
353 ldebugfs_remove(&type->typ_debugfs_entry);
356 lu_device_type_fini(type->typ_lu);
358 spin_lock(&obd_types_lock);
359 list_del(&type->typ_chain);
360 spin_unlock(&obd_types_lock);
361 OBD_FREE(type->typ_name, strlen(name) + 1);
362 if (type->typ_dt_ops != NULL)
363 OBD_FREE_PTR(type->typ_dt_ops);
364 if (type->typ_md_ops != NULL)
365 OBD_FREE_PTR(type->typ_md_ops);
366 OBD_FREE(type, sizeof(*type));
368 } /* class_unregister_type */
369 EXPORT_SYMBOL(class_unregister_type);
372 * Create a new obd device.
374 * Allocate the new obd_device and initialize it.
376 * \param[in] type_name obd device type string.
377 * \param[in] name obd device name.
378 * \param[in] uuid obd device UUID
380 * \retval newdev pointer to created obd_device
381 * \retval ERR_PTR(errno) on error
383 struct obd_device *class_newdev(const char *type_name, const char *name,
386 struct obd_device *newdev;
387 struct obd_type *type = NULL;
390 if (strlen(name) >= MAX_OBD_NAME) {
391 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
392 RETURN(ERR_PTR(-EINVAL));
395 type = class_get_type(type_name);
397 CERROR("OBD: unknown type: %s\n", type_name);
398 RETURN(ERR_PTR(-ENODEV));
401 newdev = obd_device_alloc();
402 if (newdev == NULL) {
403 class_put_type(type);
404 RETURN(ERR_PTR(-ENOMEM));
406 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
407 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
408 newdev->obd_type = type;
409 newdev->obd_minor = -1;
411 rwlock_init(&newdev->obd_pool_lock);
412 newdev->obd_pool_limit = 0;
413 newdev->obd_pool_slv = 0;
415 INIT_LIST_HEAD(&newdev->obd_exports);
416 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
417 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
418 INIT_LIST_HEAD(&newdev->obd_exports_timed);
419 INIT_LIST_HEAD(&newdev->obd_nid_stats);
420 spin_lock_init(&newdev->obd_nid_lock);
421 spin_lock_init(&newdev->obd_dev_lock);
422 mutex_init(&newdev->obd_dev_mutex);
423 spin_lock_init(&newdev->obd_osfs_lock);
424 /* newdev->obd_osfs_age must be set to a value in the distant
425 * past to guarantee a fresh statfs is fetched on mount. */
426 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
428 /* XXX belongs in setup not attach */
429 init_rwsem(&newdev->obd_observer_link_sem);
431 spin_lock_init(&newdev->obd_recovery_task_lock);
432 init_waitqueue_head(&newdev->obd_next_transno_waitq);
433 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
434 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
435 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
436 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
437 INIT_LIST_HEAD(&newdev->obd_evict_list);
438 INIT_LIST_HEAD(&newdev->obd_lwp_list);
440 llog_group_init(&newdev->obd_olg);
441 /* Detach drops this */
442 atomic_set(&newdev->obd_refcount, 1);
443 lu_ref_init(&newdev->obd_reference);
444 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
446 newdev->obd_conn_inprogress = 0;
448 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
450 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
451 newdev->obd_name, newdev);
459 * \param[in] obd obd_device to be freed
463 void class_free_dev(struct obd_device *obd)
465 struct obd_type *obd_type = obd->obd_type;
467 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
468 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
469 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
470 "obd %p != obd_devs[%d] %p\n",
471 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
472 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
473 "obd_refcount should be 0, not %d\n",
474 atomic_read(&obd->obd_refcount));
475 LASSERT(obd_type != NULL);
477 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
478 obd->obd_name, obd->obd_type->typ_name);
480 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
481 obd->obd_name, obd->obd_uuid.uuid);
482 if (obd->obd_stopping) {
485 /* If we're not stopping, we were never set up */
486 err = obd_cleanup(obd);
488 CERROR("Cleanup %s returned %d\n",
492 obd_device_free(obd);
494 class_put_type(obd_type);
498 * Unregister obd device.
500 * Free slot in obd_dev[] used by \a obd.
502 * \param[in] new_obd obd_device to be unregistered
506 void class_unregister_device(struct obd_device *obd)
508 write_lock(&obd_dev_lock);
509 if (obd->obd_minor >= 0) {
510 LASSERT(obd_devs[obd->obd_minor] == obd);
511 obd_devs[obd->obd_minor] = NULL;
514 write_unlock(&obd_dev_lock);
518 * Register obd device.
520 * Find free slot in obd_devs[], fills it with \a new_obd.
522 * \param[in] new_obd obd_device to be registered
525 * \retval -EEXIST device with this name is registered
526 * \retval -EOVERFLOW obd_devs[] is full
528 int class_register_device(struct obd_device *new_obd)
532 int new_obd_minor = 0;
533 bool minor_assign = false;
534 bool retried = false;
537 write_lock(&obd_dev_lock);
538 for (i = 0; i < class_devno_max(); i++) {
539 struct obd_device *obd = class_num2obd(i);
542 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
545 write_unlock(&obd_dev_lock);
547 /* the obd_device could be waited to be
548 * destroyed by the "obd_zombie_impexp_thread".
550 obd_zombie_barrier();
555 CERROR("%s: already exists, won't add\n",
557 /* in case we found a free slot before duplicate */
558 minor_assign = false;
562 if (!minor_assign && obd == NULL) {
569 new_obd->obd_minor = new_obd_minor;
570 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
571 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
572 obd_devs[new_obd_minor] = new_obd;
576 CERROR("%s: all %u/%u devices used, increase "
577 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
578 i, class_devno_max(), ret);
581 write_unlock(&obd_dev_lock);
586 static int class_name2dev_nolock(const char *name)
593 for (i = 0; i < class_devno_max(); i++) {
594 struct obd_device *obd = class_num2obd(i);
596 if (obd && strcmp(name, obd->obd_name) == 0) {
597 /* Make sure we finished attaching before we give
598 out any references */
599 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
600 if (obd->obd_attached) {
610 int class_name2dev(const char *name)
617 read_lock(&obd_dev_lock);
618 i = class_name2dev_nolock(name);
619 read_unlock(&obd_dev_lock);
623 EXPORT_SYMBOL(class_name2dev);
625 struct obd_device *class_name2obd(const char *name)
627 int dev = class_name2dev(name);
629 if (dev < 0 || dev > class_devno_max())
631 return class_num2obd(dev);
633 EXPORT_SYMBOL(class_name2obd);
635 int class_uuid2dev_nolock(struct obd_uuid *uuid)
639 for (i = 0; i < class_devno_max(); i++) {
640 struct obd_device *obd = class_num2obd(i);
642 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
643 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
651 int class_uuid2dev(struct obd_uuid *uuid)
655 read_lock(&obd_dev_lock);
656 i = class_uuid2dev_nolock(uuid);
657 read_unlock(&obd_dev_lock);
661 EXPORT_SYMBOL(class_uuid2dev);
663 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
665 int dev = class_uuid2dev(uuid);
668 return class_num2obd(dev);
670 EXPORT_SYMBOL(class_uuid2obd);
673 * Get obd device from ::obd_devs[]
675 * \param num [in] array index
677 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
678 * otherwise return the obd device there.
680 struct obd_device *class_num2obd(int num)
682 struct obd_device *obd = NULL;
684 if (num < class_devno_max()) {
689 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
690 "%p obd_magic %08x != %08x\n",
691 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
692 LASSERTF(obd->obd_minor == num,
693 "%p obd_minor %0d != %0d\n",
694 obd, obd->obd_minor, num);
701 * Find obd in obd_dev[] by name or uuid.
703 * Increment obd's refcount if found.
705 * \param[in] str obd name or uuid
707 * \retval NULL if not found
708 * \retval target pointer to found obd_device
710 struct obd_device *class_dev_by_str(const char *str)
712 struct obd_device *target = NULL;
713 struct obd_uuid tgtuuid;
716 obd_str2uuid(&tgtuuid, str);
718 read_lock(&obd_dev_lock);
719 rc = class_uuid2dev_nolock(&tgtuuid);
721 rc = class_name2dev_nolock(str);
724 target = class_num2obd(rc);
727 class_incref(target, "find", current);
728 read_unlock(&obd_dev_lock);
732 EXPORT_SYMBOL(class_dev_by_str);
735 * Get obd devices count. Device in any
737 * \retval obd device count
739 int get_devices_count(void)
741 int index, max_index = class_devno_max(), dev_count = 0;
743 read_lock(&obd_dev_lock);
744 for (index = 0; index <= max_index; index++) {
745 struct obd_device *obd = class_num2obd(index);
749 read_unlock(&obd_dev_lock);
753 EXPORT_SYMBOL(get_devices_count);
755 void class_obd_list(void)
760 read_lock(&obd_dev_lock);
761 for (i = 0; i < class_devno_max(); i++) {
762 struct obd_device *obd = class_num2obd(i);
766 if (obd->obd_stopping)
768 else if (obd->obd_set_up)
770 else if (obd->obd_attached)
774 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
775 i, status, obd->obd_type->typ_name,
776 obd->obd_name, obd->obd_uuid.uuid,
777 atomic_read(&obd->obd_refcount));
779 read_unlock(&obd_dev_lock);
783 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
784 specified, then only the client with that uuid is returned,
785 otherwise any client connected to the tgt is returned. */
786 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
787 const char * typ_name,
788 struct obd_uuid *grp_uuid)
792 read_lock(&obd_dev_lock);
793 for (i = 0; i < class_devno_max(); i++) {
794 struct obd_device *obd = class_num2obd(i);
798 if ((strncmp(obd->obd_type->typ_name, typ_name,
799 strlen(typ_name)) == 0)) {
800 if (obd_uuid_equals(tgt_uuid,
801 &obd->u.cli.cl_target_uuid) &&
802 ((grp_uuid)? obd_uuid_equals(grp_uuid,
803 &obd->obd_uuid) : 1)) {
804 read_unlock(&obd_dev_lock);
809 read_unlock(&obd_dev_lock);
813 EXPORT_SYMBOL(class_find_client_obd);
815 /* Iterate the obd_device list looking devices have grp_uuid. Start
816 searching at *next, and if a device is found, the next index to look
817 at is saved in *next. If next is NULL, then the first matching device
818 will always be returned. */
819 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
825 else if (*next >= 0 && *next < class_devno_max())
830 read_lock(&obd_dev_lock);
831 for (; i < class_devno_max(); i++) {
832 struct obd_device *obd = class_num2obd(i);
836 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
839 read_unlock(&obd_dev_lock);
843 read_unlock(&obd_dev_lock);
847 EXPORT_SYMBOL(class_devices_in_group);
850 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
851 * adjust sptlrpc settings accordingly.
853 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
855 struct obd_device *obd;
859 LASSERT(namelen > 0);
861 read_lock(&obd_dev_lock);
862 for (i = 0; i < class_devno_max(); i++) {
863 obd = class_num2obd(i);
865 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
868 /* only notify mdc, osc, osp, lwp, mdt, ost
869 * because only these have a -sptlrpc llog */
870 type = obd->obd_type->typ_name;
871 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
872 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
873 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
874 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
875 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
876 strcmp(type, LUSTRE_OST_NAME) != 0)
879 if (strncmp(obd->obd_name, fsname, namelen))
882 class_incref(obd, __FUNCTION__, obd);
883 read_unlock(&obd_dev_lock);
884 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
885 sizeof(KEY_SPTLRPC_CONF),
886 KEY_SPTLRPC_CONF, 0, NULL, NULL);
888 class_decref(obd, __FUNCTION__, obd);
889 read_lock(&obd_dev_lock);
891 read_unlock(&obd_dev_lock);
894 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
896 void obd_cleanup_caches(void)
899 if (obd_device_cachep) {
900 kmem_cache_destroy(obd_device_cachep);
901 obd_device_cachep = NULL;
907 int obd_init_caches(void)
912 LASSERT(obd_device_cachep == NULL);
913 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
914 sizeof(struct obd_device),
915 0, 0, 0, sizeof(struct obd_device), NULL);
916 if (!obd_device_cachep)
917 GOTO(out, rc = -ENOMEM);
921 obd_cleanup_caches();
925 /* map connection to client */
926 struct obd_export *class_conn2export(struct lustre_handle *conn)
928 struct obd_export *export;
932 CDEBUG(D_CACHE, "looking for null handle\n");
936 if (conn->cookie == -1) { /* this means assign a new connection */
937 CDEBUG(D_CACHE, "want a new connection\n");
941 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
942 export = class_handle2object(conn->cookie, NULL);
945 EXPORT_SYMBOL(class_conn2export);
947 struct obd_device *class_exp2obd(struct obd_export *exp)
953 EXPORT_SYMBOL(class_exp2obd);
955 struct obd_import *class_exp2cliimp(struct obd_export *exp)
957 struct obd_device *obd = exp->exp_obd;
960 return obd->u.cli.cl_import;
962 EXPORT_SYMBOL(class_exp2cliimp);
964 /* Export management functions */
965 static void class_export_destroy(struct obd_export *exp)
967 struct obd_device *obd = exp->exp_obd;
970 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
971 LASSERT(obd != NULL);
973 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
974 exp->exp_client_uuid.uuid, obd->obd_name);
976 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
977 if (exp->exp_connection)
978 ptlrpc_put_connection_superhack(exp->exp_connection);
980 LASSERT(list_empty(&exp->exp_outstanding_replies));
981 LASSERT(list_empty(&exp->exp_uncommitted_replies));
982 LASSERT(list_empty(&exp->exp_req_replay_queue));
983 LASSERT(list_empty(&exp->exp_hp_rpcs));
984 obd_destroy_export(exp);
985 /* self export doesn't hold a reference to an obd, although it
986 * exists until freeing of the obd */
987 if (exp != obd->obd_self_export)
988 class_decref(obd, "export", exp);
990 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
994 static void export_handle_addref(void *export)
996 class_export_get(export);
999 static struct portals_handle_ops export_handle_ops = {
1000 .hop_addref = export_handle_addref,
1004 struct obd_export *class_export_get(struct obd_export *exp)
1006 atomic_inc(&exp->exp_refcount);
1007 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1008 atomic_read(&exp->exp_refcount));
1011 EXPORT_SYMBOL(class_export_get);
1013 void class_export_put(struct obd_export *exp)
1015 LASSERT(exp != NULL);
1016 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1017 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1018 atomic_read(&exp->exp_refcount) - 1);
1020 if (atomic_dec_and_test(&exp->exp_refcount)) {
1021 struct obd_device *obd = exp->exp_obd;
1023 CDEBUG(D_IOCTL, "final put %p/%s\n",
1024 exp, exp->exp_client_uuid.uuid);
1026 /* release nid stat refererence */
1027 lprocfs_exp_cleanup(exp);
1029 if (exp == obd->obd_self_export) {
1030 /* self export should be destroyed without
1031 * zombie thread as it doesn't hold a
1032 * reference to obd and doesn't hold any
1034 class_export_destroy(exp);
1035 /* self export is destroyed, no class
1036 * references exist and it is safe to free
1038 class_free_dev(obd);
1040 LASSERT(!list_empty(&exp->exp_obd_chain));
1041 obd_zombie_export_add(exp);
1046 EXPORT_SYMBOL(class_export_put);
1048 static void obd_zombie_exp_cull(struct work_struct *ws)
1050 struct obd_export *export;
1052 export = container_of(ws, struct obd_export, exp_zombie_work);
1053 class_export_destroy(export);
1056 /* Creates a new export, adds it to the hash table, and returns a
1057 * pointer to it. The refcount is 2: one for the hash reference, and
1058 * one for the pointer returned by this function. */
1059 struct obd_export *__class_new_export(struct obd_device *obd,
1060 struct obd_uuid *cluuid, bool is_self)
1062 struct obd_export *export;
1063 struct cfs_hash *hash = NULL;
1067 OBD_ALLOC_PTR(export);
1069 return ERR_PTR(-ENOMEM);
1071 export->exp_conn_cnt = 0;
1072 export->exp_lock_hash = NULL;
1073 export->exp_flock_hash = NULL;
1074 /* 2 = class_handle_hash + last */
1075 atomic_set(&export->exp_refcount, 2);
1076 atomic_set(&export->exp_rpc_count, 0);
1077 atomic_set(&export->exp_cb_count, 0);
1078 atomic_set(&export->exp_locks_count, 0);
1079 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1080 INIT_LIST_HEAD(&export->exp_locks_list);
1081 spin_lock_init(&export->exp_locks_list_guard);
1083 atomic_set(&export->exp_replay_count, 0);
1084 export->exp_obd = obd;
1085 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1086 spin_lock_init(&export->exp_uncommitted_replies_lock);
1087 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1088 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1089 INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1090 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1091 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1092 class_handle_hash(&export->exp_handle, &export_handle_ops);
1093 export->exp_last_request_time = ktime_get_real_seconds();
1094 spin_lock_init(&export->exp_lock);
1095 spin_lock_init(&export->exp_rpc_lock);
1096 INIT_HLIST_NODE(&export->exp_uuid_hash);
1097 INIT_HLIST_NODE(&export->exp_nid_hash);
1098 INIT_HLIST_NODE(&export->exp_gen_hash);
1099 spin_lock_init(&export->exp_bl_list_lock);
1100 INIT_LIST_HEAD(&export->exp_bl_list);
1101 INIT_LIST_HEAD(&export->exp_stale_list);
1102 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1104 export->exp_sp_peer = LUSTRE_SP_ANY;
1105 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1106 export->exp_client_uuid = *cluuid;
1107 obd_init_export(export);
1109 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1110 spin_lock(&obd->obd_dev_lock);
1111 /* shouldn't happen, but might race */
1112 if (obd->obd_stopping)
1113 GOTO(exit_unlock, rc = -ENODEV);
1115 hash = cfs_hash_getref(obd->obd_uuid_hash);
1117 GOTO(exit_unlock, rc = -ENODEV);
1118 spin_unlock(&obd->obd_dev_lock);
1120 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1122 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1123 obd->obd_name, cluuid->uuid, rc);
1124 GOTO(exit_err, rc = -EALREADY);
1128 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1129 spin_lock(&obd->obd_dev_lock);
1130 if (obd->obd_stopping) {
1132 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1133 GOTO(exit_unlock, rc = -ESHUTDOWN);
1137 class_incref(obd, "export", export);
1138 list_add_tail(&export->exp_obd_chain_timed,
1139 &obd->obd_exports_timed);
1140 list_add(&export->exp_obd_chain, &obd->obd_exports);
1141 obd->obd_num_exports++;
1143 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1144 INIT_LIST_HEAD(&export->exp_obd_chain);
1146 spin_unlock(&obd->obd_dev_lock);
1148 cfs_hash_putref(hash);
1152 spin_unlock(&obd->obd_dev_lock);
1155 cfs_hash_putref(hash);
1156 class_handle_unhash(&export->exp_handle);
1157 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1158 obd_destroy_export(export);
1159 OBD_FREE_PTR(export);
1163 struct obd_export *class_new_export(struct obd_device *obd,
1164 struct obd_uuid *uuid)
1166 return __class_new_export(obd, uuid, false);
1168 EXPORT_SYMBOL(class_new_export);
1170 struct obd_export *class_new_export_self(struct obd_device *obd,
1171 struct obd_uuid *uuid)
1173 return __class_new_export(obd, uuid, true);
1176 void class_unlink_export(struct obd_export *exp)
1178 class_handle_unhash(&exp->exp_handle);
1180 if (exp->exp_obd->obd_self_export == exp) {
1181 class_export_put(exp);
1185 spin_lock(&exp->exp_obd->obd_dev_lock);
1186 /* delete an uuid-export hashitem from hashtables */
1187 if (!hlist_unhashed(&exp->exp_uuid_hash))
1188 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1189 &exp->exp_client_uuid,
1190 &exp->exp_uuid_hash);
1192 #ifdef HAVE_SERVER_SUPPORT
1193 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1194 struct tg_export_data *ted = &exp->exp_target_data;
1195 struct cfs_hash *hash;
1197 /* Because obd_gen_hash will not be released until
1198 * class_cleanup(), so hash should never be NULL here */
1199 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1200 LASSERT(hash != NULL);
1201 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1202 &exp->exp_gen_hash);
1203 cfs_hash_putref(hash);
1205 #endif /* HAVE_SERVER_SUPPORT */
1207 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1208 list_del_init(&exp->exp_obd_chain_timed);
1209 exp->exp_obd->obd_num_exports--;
1210 spin_unlock(&exp->exp_obd->obd_dev_lock);
1211 atomic_inc(&obd_stale_export_num);
1213 /* A reference is kept by obd_stale_exports list */
1214 obd_stale_export_put(exp);
1216 EXPORT_SYMBOL(class_unlink_export);
1218 /* Import management functions */
1219 static void obd_zombie_import_free(struct obd_import *imp)
1223 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1224 imp->imp_obd->obd_name);
1226 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1228 ptlrpc_put_connection_superhack(imp->imp_connection);
1230 while (!list_empty(&imp->imp_conn_list)) {
1231 struct obd_import_conn *imp_conn;
1233 imp_conn = list_entry(imp->imp_conn_list.next,
1234 struct obd_import_conn, oic_item);
1235 list_del_init(&imp_conn->oic_item);
1236 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1237 OBD_FREE(imp_conn, sizeof(*imp_conn));
1240 LASSERT(imp->imp_sec == NULL);
1241 LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1242 imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1243 class_decref(imp->imp_obd, "import", imp);
1248 struct obd_import *class_import_get(struct obd_import *import)
1250 atomic_inc(&import->imp_refcount);
1251 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1252 atomic_read(&import->imp_refcount),
1253 import->imp_obd->obd_name);
1256 EXPORT_SYMBOL(class_import_get);
1258 void class_import_put(struct obd_import *imp)
1262 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1264 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1265 atomic_read(&imp->imp_refcount) - 1,
1266 imp->imp_obd->obd_name);
1268 if (atomic_dec_and_test(&imp->imp_refcount)) {
1269 CDEBUG(D_INFO, "final put import %p\n", imp);
1270 obd_zombie_import_add(imp);
1275 EXPORT_SYMBOL(class_import_put);
1277 static void init_imp_at(struct imp_at *at) {
1279 at_init(&at->iat_net_latency, 0, 0);
1280 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1281 /* max service estimates are tracked on the server side, so
1282 don't use the AT history here, just use the last reported
1283 val. (But keep hist for proc histogram, worst_ever) */
1284 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1289 static void obd_zombie_imp_cull(struct work_struct *ws)
1291 struct obd_import *import;
1293 import = container_of(ws, struct obd_import, imp_zombie_work);
1294 obd_zombie_import_free(import);
1297 struct obd_import *class_new_import(struct obd_device *obd)
1299 struct obd_import *imp;
1300 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1302 OBD_ALLOC(imp, sizeof(*imp));
1306 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1307 INIT_LIST_HEAD(&imp->imp_replay_list);
1308 INIT_LIST_HEAD(&imp->imp_sending_list);
1309 INIT_LIST_HEAD(&imp->imp_delayed_list);
1310 INIT_LIST_HEAD(&imp->imp_committed_list);
1311 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1312 imp->imp_known_replied_xid = 0;
1313 imp->imp_replay_cursor = &imp->imp_committed_list;
1314 spin_lock_init(&imp->imp_lock);
1315 imp->imp_last_success_conn = 0;
1316 imp->imp_state = LUSTRE_IMP_NEW;
1317 imp->imp_obd = class_incref(obd, "import", imp);
1318 mutex_init(&imp->imp_sec_mutex);
1319 init_waitqueue_head(&imp->imp_recovery_waitq);
1320 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1322 if (curr_pid_ns && curr_pid_ns->child_reaper)
1323 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1325 imp->imp_sec_refpid = 1;
1327 atomic_set(&imp->imp_refcount, 2);
1328 atomic_set(&imp->imp_unregistering, 0);
1329 atomic_set(&imp->imp_reqs, 0);
1330 atomic_set(&imp->imp_inflight, 0);
1331 atomic_set(&imp->imp_replay_inflight, 0);
1332 atomic_set(&imp->imp_inval_count, 0);
1333 INIT_LIST_HEAD(&imp->imp_conn_list);
1334 init_imp_at(&imp->imp_at);
1336 /* the default magic is V2, will be used in connect RPC, and
1337 * then adjusted according to the flags in request/reply. */
1338 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1342 EXPORT_SYMBOL(class_new_import);
1344 void class_destroy_import(struct obd_import *import)
1346 LASSERT(import != NULL);
1347 LASSERT(import != LP_POISON);
1349 spin_lock(&import->imp_lock);
1350 import->imp_generation++;
1351 spin_unlock(&import->imp_lock);
1352 class_import_put(import);
1354 EXPORT_SYMBOL(class_destroy_import);
1356 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1358 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1360 spin_lock(&exp->exp_locks_list_guard);
1362 LASSERT(lock->l_exp_refs_nr >= 0);
1364 if (lock->l_exp_refs_target != NULL &&
1365 lock->l_exp_refs_target != exp) {
1366 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1367 exp, lock, lock->l_exp_refs_target);
1369 if ((lock->l_exp_refs_nr ++) == 0) {
1370 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1371 lock->l_exp_refs_target = exp;
1373 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1374 lock, exp, lock->l_exp_refs_nr);
1375 spin_unlock(&exp->exp_locks_list_guard);
1377 EXPORT_SYMBOL(__class_export_add_lock_ref);
1379 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1381 spin_lock(&exp->exp_locks_list_guard);
1382 LASSERT(lock->l_exp_refs_nr > 0);
1383 if (lock->l_exp_refs_target != exp) {
1384 LCONSOLE_WARN("lock %p, "
1385 "mismatching export pointers: %p, %p\n",
1386 lock, lock->l_exp_refs_target, exp);
1388 if (-- lock->l_exp_refs_nr == 0) {
1389 list_del_init(&lock->l_exp_refs_link);
1390 lock->l_exp_refs_target = NULL;
1392 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1393 lock, exp, lock->l_exp_refs_nr);
1394 spin_unlock(&exp->exp_locks_list_guard);
1396 EXPORT_SYMBOL(__class_export_del_lock_ref);
1399 /* A connection defines an export context in which preallocation can
1400 be managed. This releases the export pointer reference, and returns
1401 the export handle, so the export refcount is 1 when this function
1403 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1404 struct obd_uuid *cluuid)
1406 struct obd_export *export;
1407 LASSERT(conn != NULL);
1408 LASSERT(obd != NULL);
1409 LASSERT(cluuid != NULL);
1412 export = class_new_export(obd, cluuid);
1414 RETURN(PTR_ERR(export));
1416 conn->cookie = export->exp_handle.h_cookie;
1417 class_export_put(export);
1419 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1420 cluuid->uuid, conn->cookie);
1423 EXPORT_SYMBOL(class_connect);
1425 /* if export is involved in recovery then clean up related things */
1426 static void class_export_recovery_cleanup(struct obd_export *exp)
1428 struct obd_device *obd = exp->exp_obd;
1430 spin_lock(&obd->obd_recovery_task_lock);
1431 if (obd->obd_recovering) {
1432 if (exp->exp_in_recovery) {
1433 spin_lock(&exp->exp_lock);
1434 exp->exp_in_recovery = 0;
1435 spin_unlock(&exp->exp_lock);
1436 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1437 atomic_dec(&obd->obd_connected_clients);
1440 /* if called during recovery then should update
1441 * obd_stale_clients counter,
1442 * lightweight exports are not counted */
1443 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1444 exp->exp_obd->obd_stale_clients++;
1446 spin_unlock(&obd->obd_recovery_task_lock);
1448 spin_lock(&exp->exp_lock);
1449 /** Cleanup req replay fields */
1450 if (exp->exp_req_replay_needed) {
1451 exp->exp_req_replay_needed = 0;
1453 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1454 atomic_dec(&obd->obd_req_replay_clients);
1457 /** Cleanup lock replay data */
1458 if (exp->exp_lock_replay_needed) {
1459 exp->exp_lock_replay_needed = 0;
1461 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1462 atomic_dec(&obd->obd_lock_replay_clients);
1464 spin_unlock(&exp->exp_lock);
1467 /* This function removes 1-3 references from the export:
1468 * 1 - for export pointer passed
1469 * and if disconnect really need
1470 * 2 - removing from hash
1471 * 3 - in client_unlink_export
1472 * The export pointer passed to this function can destroyed */
1473 int class_disconnect(struct obd_export *export)
1475 int already_disconnected;
1478 if (export == NULL) {
1479 CWARN("attempting to free NULL export %p\n", export);
1483 spin_lock(&export->exp_lock);
1484 already_disconnected = export->exp_disconnected;
1485 export->exp_disconnected = 1;
1486 /* We hold references of export for uuid hash
1487 * and nid_hash and export link at least. So
1488 * it is safe to call cfs_hash_del in there. */
1489 if (!hlist_unhashed(&export->exp_nid_hash))
1490 cfs_hash_del(export->exp_obd->obd_nid_hash,
1491 &export->exp_connection->c_peer.nid,
1492 &export->exp_nid_hash);
1493 spin_unlock(&export->exp_lock);
1495 /* class_cleanup(), abort_recovery(), and class_fail_export()
1496 * all end up in here, and if any of them race we shouldn't
1497 * call extra class_export_puts(). */
1498 if (already_disconnected) {
1499 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1500 GOTO(no_disconn, already_disconnected);
1503 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1504 export->exp_handle.h_cookie);
1506 class_export_recovery_cleanup(export);
1507 class_unlink_export(export);
1509 class_export_put(export);
1512 EXPORT_SYMBOL(class_disconnect);
1514 /* Return non-zero for a fully connected export */
1515 int class_connected_export(struct obd_export *exp)
1520 spin_lock(&exp->exp_lock);
1521 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1522 spin_unlock(&exp->exp_lock);
1526 EXPORT_SYMBOL(class_connected_export);
1528 static void class_disconnect_export_list(struct list_head *list,
1529 enum obd_option flags)
1532 struct obd_export *exp;
1535 /* It's possible that an export may disconnect itself, but
1536 * nothing else will be added to this list. */
1537 while (!list_empty(list)) {
1538 exp = list_entry(list->next, struct obd_export,
1540 /* need for safe call CDEBUG after obd_disconnect */
1541 class_export_get(exp);
1543 spin_lock(&exp->exp_lock);
1544 exp->exp_flags = flags;
1545 spin_unlock(&exp->exp_lock);
1547 if (obd_uuid_equals(&exp->exp_client_uuid,
1548 &exp->exp_obd->obd_uuid)) {
1550 "exp %p export uuid == obd uuid, don't discon\n",
1552 /* Need to delete this now so we don't end up pointing
1553 * to work_list later when this export is cleaned up. */
1554 list_del_init(&exp->exp_obd_chain);
1555 class_export_put(exp);
1559 class_export_get(exp);
1560 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1561 "last request at %lld\n",
1562 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1563 exp, exp->exp_last_request_time);
1564 /* release one export reference anyway */
1565 rc = obd_disconnect(exp);
1567 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1568 obd_export_nid2str(exp), exp, rc);
1569 class_export_put(exp);
1574 void class_disconnect_exports(struct obd_device *obd)
1576 struct list_head work_list;
1579 /* Move all of the exports from obd_exports to a work list, en masse. */
1580 INIT_LIST_HEAD(&work_list);
1581 spin_lock(&obd->obd_dev_lock);
1582 list_splice_init(&obd->obd_exports, &work_list);
1583 list_splice_init(&obd->obd_delayed_exports, &work_list);
1584 spin_unlock(&obd->obd_dev_lock);
1586 if (!list_empty(&work_list)) {
1587 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1588 "disconnecting them\n", obd->obd_minor, obd);
1589 class_disconnect_export_list(&work_list,
1590 exp_flags_from_obd(obd));
1592 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1593 obd->obd_minor, obd);
1596 EXPORT_SYMBOL(class_disconnect_exports);
1598 /* Remove exports that have not completed recovery.
1600 void class_disconnect_stale_exports(struct obd_device *obd,
1601 int (*test_export)(struct obd_export *))
1603 struct list_head work_list;
1604 struct obd_export *exp, *n;
1608 INIT_LIST_HEAD(&work_list);
1609 spin_lock(&obd->obd_dev_lock);
1610 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1612 /* don't count self-export as client */
1613 if (obd_uuid_equals(&exp->exp_client_uuid,
1614 &exp->exp_obd->obd_uuid))
1617 /* don't evict clients which have no slot in last_rcvd
1618 * (e.g. lightweight connection) */
1619 if (exp->exp_target_data.ted_lr_idx == -1)
1622 spin_lock(&exp->exp_lock);
1623 if (exp->exp_failed || test_export(exp)) {
1624 spin_unlock(&exp->exp_lock);
1627 exp->exp_failed = 1;
1628 spin_unlock(&exp->exp_lock);
1630 list_move(&exp->exp_obd_chain, &work_list);
1632 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1633 obd->obd_name, exp->exp_client_uuid.uuid,
1634 obd_export_nid2str(exp));
1635 print_export_data(exp, "EVICTING", 0, D_HA);
1637 spin_unlock(&obd->obd_dev_lock);
1640 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1641 obd->obd_name, evicted);
1643 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1644 OBD_OPT_ABORT_RECOV);
1647 EXPORT_SYMBOL(class_disconnect_stale_exports);
1649 void class_fail_export(struct obd_export *exp)
1651 int rc, already_failed;
1653 spin_lock(&exp->exp_lock);
1654 already_failed = exp->exp_failed;
1655 exp->exp_failed = 1;
1656 spin_unlock(&exp->exp_lock);
1658 if (already_failed) {
1659 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1660 exp, exp->exp_client_uuid.uuid);
1664 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1665 exp, exp->exp_client_uuid.uuid);
1667 if (obd_dump_on_timeout)
1668 libcfs_debug_dumplog();
1670 /* need for safe call CDEBUG after obd_disconnect */
1671 class_export_get(exp);
1673 /* Most callers into obd_disconnect are removing their own reference
1674 * (request, for example) in addition to the one from the hash table.
1675 * We don't have such a reference here, so make one. */
1676 class_export_get(exp);
1677 rc = obd_disconnect(exp);
1679 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1681 CDEBUG(D_HA, "disconnected export %p/%s\n",
1682 exp, exp->exp_client_uuid.uuid);
1683 class_export_put(exp);
1685 EXPORT_SYMBOL(class_fail_export);
1687 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1689 struct cfs_hash *nid_hash;
1690 struct obd_export *doomed_exp = NULL;
1691 int exports_evicted = 0;
1693 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1695 spin_lock(&obd->obd_dev_lock);
1696 /* umount has run already, so evict thread should leave
1697 * its task to umount thread now */
1698 if (obd->obd_stopping) {
1699 spin_unlock(&obd->obd_dev_lock);
1700 return exports_evicted;
1702 nid_hash = obd->obd_nid_hash;
1703 cfs_hash_getref(nid_hash);
1704 spin_unlock(&obd->obd_dev_lock);
1707 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1708 if (doomed_exp == NULL)
1711 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1712 "nid %s found, wanted nid %s, requested nid %s\n",
1713 obd_export_nid2str(doomed_exp),
1714 libcfs_nid2str(nid_key), nid);
1715 LASSERTF(doomed_exp != obd->obd_self_export,
1716 "self-export is hashed by NID?\n");
1718 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1719 "request\n", obd->obd_name,
1720 obd_uuid2str(&doomed_exp->exp_client_uuid),
1721 obd_export_nid2str(doomed_exp));
1722 class_fail_export(doomed_exp);
1723 class_export_put(doomed_exp);
1726 cfs_hash_putref(nid_hash);
1728 if (!exports_evicted)
1729 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1730 obd->obd_name, nid);
1731 return exports_evicted;
1733 EXPORT_SYMBOL(obd_export_evict_by_nid);
1735 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1737 struct cfs_hash *uuid_hash;
1738 struct obd_export *doomed_exp = NULL;
1739 struct obd_uuid doomed_uuid;
1740 int exports_evicted = 0;
1742 spin_lock(&obd->obd_dev_lock);
1743 if (obd->obd_stopping) {
1744 spin_unlock(&obd->obd_dev_lock);
1745 return exports_evicted;
1747 uuid_hash = obd->obd_uuid_hash;
1748 cfs_hash_getref(uuid_hash);
1749 spin_unlock(&obd->obd_dev_lock);
1751 obd_str2uuid(&doomed_uuid, uuid);
1752 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1753 CERROR("%s: can't evict myself\n", obd->obd_name);
1754 cfs_hash_putref(uuid_hash);
1755 return exports_evicted;
1758 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1760 if (doomed_exp == NULL) {
1761 CERROR("%s: can't disconnect %s: no exports found\n",
1762 obd->obd_name, uuid);
1764 CWARN("%s: evicting %s at adminstrative request\n",
1765 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1766 class_fail_export(doomed_exp);
1767 class_export_put(doomed_exp);
1770 cfs_hash_putref(uuid_hash);
1772 return exports_evicted;
1775 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1776 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1777 EXPORT_SYMBOL(class_export_dump_hook);
1780 static void print_export_data(struct obd_export *exp, const char *status,
1781 int locks, int debug_level)
1783 struct ptlrpc_reply_state *rs;
1784 struct ptlrpc_reply_state *first_reply = NULL;
1787 spin_lock(&exp->exp_lock);
1788 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1794 spin_unlock(&exp->exp_lock);
1796 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1797 "%p %s %llu stale:%d\n",
1798 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1799 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1800 atomic_read(&exp->exp_rpc_count),
1801 atomic_read(&exp->exp_cb_count),
1802 atomic_read(&exp->exp_locks_count),
1803 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1804 nreplies, first_reply, nreplies > 3 ? "..." : "",
1805 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1806 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1807 if (locks && class_export_dump_hook != NULL)
1808 class_export_dump_hook(exp);
1812 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1814 struct obd_export *exp;
1816 spin_lock(&obd->obd_dev_lock);
1817 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1818 print_export_data(exp, "ACTIVE", locks, debug_level);
1819 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1820 print_export_data(exp, "UNLINKED", locks, debug_level);
1821 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1822 print_export_data(exp, "DELAYED", locks, debug_level);
1823 spin_unlock(&obd->obd_dev_lock);
1826 void obd_exports_barrier(struct obd_device *obd)
1829 LASSERT(list_empty(&obd->obd_exports));
1830 spin_lock(&obd->obd_dev_lock);
1831 while (!list_empty(&obd->obd_unlinked_exports)) {
1832 spin_unlock(&obd->obd_dev_lock);
1833 set_current_state(TASK_UNINTERRUPTIBLE);
1834 schedule_timeout(cfs_time_seconds(waited));
1835 if (waited > 5 && is_power_of_2(waited)) {
1836 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1837 "more than %d seconds. "
1838 "The obd refcount = %d. Is it stuck?\n",
1839 obd->obd_name, waited,
1840 atomic_read(&obd->obd_refcount));
1841 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1844 spin_lock(&obd->obd_dev_lock);
1846 spin_unlock(&obd->obd_dev_lock);
1848 EXPORT_SYMBOL(obd_exports_barrier);
1851 * Add export to the obd_zombe thread and notify it.
1853 static void obd_zombie_export_add(struct obd_export *exp) {
1854 atomic_dec(&obd_stale_export_num);
1855 spin_lock(&exp->exp_obd->obd_dev_lock);
1856 LASSERT(!list_empty(&exp->exp_obd_chain));
1857 list_del_init(&exp->exp_obd_chain);
1858 spin_unlock(&exp->exp_obd->obd_dev_lock);
1860 queue_work(zombie_wq, &exp->exp_zombie_work);
1864 * Add import to the obd_zombe thread and notify it.
1866 static void obd_zombie_import_add(struct obd_import *imp) {
1867 LASSERT(imp->imp_sec == NULL);
1869 queue_work(zombie_wq, &imp->imp_zombie_work);
1873 * wait when obd_zombie import/export queues become empty
1875 void obd_zombie_barrier(void)
1877 flush_workqueue(zombie_wq);
1879 EXPORT_SYMBOL(obd_zombie_barrier);
1882 struct obd_export *obd_stale_export_get(void)
1884 struct obd_export *exp = NULL;
1887 spin_lock(&obd_stale_export_lock);
1888 if (!list_empty(&obd_stale_exports)) {
1889 exp = list_entry(obd_stale_exports.next,
1890 struct obd_export, exp_stale_list);
1891 list_del_init(&exp->exp_stale_list);
1893 spin_unlock(&obd_stale_export_lock);
1896 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1897 atomic_read(&obd_stale_export_num));
1901 EXPORT_SYMBOL(obd_stale_export_get);
1903 void obd_stale_export_put(struct obd_export *exp)
1907 LASSERT(list_empty(&exp->exp_stale_list));
1908 if (exp->exp_lock_hash &&
1909 atomic_read(&exp->exp_lock_hash->hs_count)) {
1910 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1911 atomic_read(&obd_stale_export_num));
1913 spin_lock_bh(&exp->exp_bl_list_lock);
1914 spin_lock(&obd_stale_export_lock);
1915 /* Add to the tail if there is no blocked locks,
1916 * to the head otherwise. */
1917 if (list_empty(&exp->exp_bl_list))
1918 list_add_tail(&exp->exp_stale_list,
1919 &obd_stale_exports);
1921 list_add(&exp->exp_stale_list,
1922 &obd_stale_exports);
1924 spin_unlock(&obd_stale_export_lock);
1925 spin_unlock_bh(&exp->exp_bl_list_lock);
1927 class_export_put(exp);
1931 EXPORT_SYMBOL(obd_stale_export_put);
1934 * Adjust the position of the export in the stale list,
1935 * i.e. move to the head of the list if is needed.
1937 void obd_stale_export_adjust(struct obd_export *exp)
1939 LASSERT(exp != NULL);
1940 spin_lock_bh(&exp->exp_bl_list_lock);
1941 spin_lock(&obd_stale_export_lock);
1943 if (!list_empty(&exp->exp_stale_list) &&
1944 !list_empty(&exp->exp_bl_list))
1945 list_move(&exp->exp_stale_list, &obd_stale_exports);
1947 spin_unlock(&obd_stale_export_lock);
1948 spin_unlock_bh(&exp->exp_bl_list_lock);
1950 EXPORT_SYMBOL(obd_stale_export_adjust);
1953 * start destroy zombie import/export thread
1955 int obd_zombie_impexp_init(void)
1957 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1965 * stop destroy zombie import/export thread
1967 void obd_zombie_impexp_stop(void)
1969 destroy_workqueue(zombie_wq);
1970 LASSERT(list_empty(&obd_stale_exports));
1973 /***** Kernel-userspace comm helpers *******/
1975 /* Get length of entire message, including header */
1976 int kuc_len(int payload_len)
1978 return sizeof(struct kuc_hdr) + payload_len;
1980 EXPORT_SYMBOL(kuc_len);
1982 /* Get a pointer to kuc header, given a ptr to the payload
1983 * @param p Pointer to payload area
1984 * @returns Pointer to kuc header
1986 struct kuc_hdr * kuc_ptr(void *p)
1988 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1989 LASSERT(lh->kuc_magic == KUC_MAGIC);
1992 EXPORT_SYMBOL(kuc_ptr);
1994 /* Alloc space for a message, and fill in header
1995 * @return Pointer to payload area
1997 void *kuc_alloc(int payload_len, int transport, int type)
2000 int len = kuc_len(payload_len);
2004 return ERR_PTR(-ENOMEM);
2006 lh->kuc_magic = KUC_MAGIC;
2007 lh->kuc_transport = transport;
2008 lh->kuc_msgtype = type;
2009 lh->kuc_msglen = len;
2011 return (void *)(lh + 1);
2013 EXPORT_SYMBOL(kuc_alloc);
2015 /* Takes pointer to payload area */
2016 void kuc_free(void *p, int payload_len)
2018 struct kuc_hdr *lh = kuc_ptr(p);
2019 OBD_FREE(lh, kuc_len(payload_len));
2021 EXPORT_SYMBOL(kuc_free);
2023 struct obd_request_slot_waiter {
2024 struct list_head orsw_entry;
2025 wait_queue_head_t orsw_waitq;
2029 static bool obd_request_slot_avail(struct client_obd *cli,
2030 struct obd_request_slot_waiter *orsw)
2034 spin_lock(&cli->cl_loi_list_lock);
2035 avail = !!list_empty(&orsw->orsw_entry);
2036 spin_unlock(&cli->cl_loi_list_lock);
2042 * For network flow control, the RPC sponsor needs to acquire a credit
2043 * before sending the RPC. The credits count for a connection is defined
2044 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2045 * the subsequent RPC sponsors need to wait until others released their
2046 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2048 int obd_get_request_slot(struct client_obd *cli)
2050 struct obd_request_slot_waiter orsw;
2051 struct l_wait_info lwi;
2054 spin_lock(&cli->cl_loi_list_lock);
2055 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2056 cli->cl_rpcs_in_flight++;
2057 spin_unlock(&cli->cl_loi_list_lock);
2061 init_waitqueue_head(&orsw.orsw_waitq);
2062 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2063 orsw.orsw_signaled = false;
2064 spin_unlock(&cli->cl_loi_list_lock);
2066 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2067 rc = l_wait_event(orsw.orsw_waitq,
2068 obd_request_slot_avail(cli, &orsw) ||
2072 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2073 * freed but other (such as obd_put_request_slot) is using it. */
2074 spin_lock(&cli->cl_loi_list_lock);
2076 if (!orsw.orsw_signaled) {
2077 if (list_empty(&orsw.orsw_entry))
2078 cli->cl_rpcs_in_flight--;
2080 list_del(&orsw.orsw_entry);
2084 if (orsw.orsw_signaled) {
2085 LASSERT(list_empty(&orsw.orsw_entry));
2089 spin_unlock(&cli->cl_loi_list_lock);
2093 EXPORT_SYMBOL(obd_get_request_slot);
2095 void obd_put_request_slot(struct client_obd *cli)
2097 struct obd_request_slot_waiter *orsw;
2099 spin_lock(&cli->cl_loi_list_lock);
2100 cli->cl_rpcs_in_flight--;
2102 /* If there is free slot, wakeup the first waiter. */
2103 if (!list_empty(&cli->cl_flight_waiters) &&
2104 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2105 orsw = list_entry(cli->cl_flight_waiters.next,
2106 struct obd_request_slot_waiter, orsw_entry);
2107 list_del_init(&orsw->orsw_entry);
2108 cli->cl_rpcs_in_flight++;
2109 wake_up(&orsw->orsw_waitq);
2111 spin_unlock(&cli->cl_loi_list_lock);
2113 EXPORT_SYMBOL(obd_put_request_slot);
2115 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2117 return cli->cl_max_rpcs_in_flight;
2119 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2121 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2123 struct obd_request_slot_waiter *orsw;
2130 if (max > OBD_MAX_RIF_MAX || max < 1)
2133 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2134 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2135 /* adjust max_mod_rpcs_in_flight to ensure it is always
2136 * strictly lower that max_rpcs_in_flight */
2138 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2139 "because it must be higher than "
2140 "max_mod_rpcs_in_flight value",
2141 cli->cl_import->imp_obd->obd_name);
2144 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2145 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2151 spin_lock(&cli->cl_loi_list_lock);
2152 old = cli->cl_max_rpcs_in_flight;
2153 cli->cl_max_rpcs_in_flight = max;
2154 client_adjust_max_dirty(cli);
2158 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2159 for (i = 0; i < diff; i++) {
2160 if (list_empty(&cli->cl_flight_waiters))
2163 orsw = list_entry(cli->cl_flight_waiters.next,
2164 struct obd_request_slot_waiter, orsw_entry);
2165 list_del_init(&orsw->orsw_entry);
2166 cli->cl_rpcs_in_flight++;
2167 wake_up(&orsw->orsw_waitq);
2169 spin_unlock(&cli->cl_loi_list_lock);
2173 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2175 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2177 return cli->cl_max_mod_rpcs_in_flight;
2179 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2181 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2183 struct obd_connect_data *ocd;
2187 if (max > OBD_MAX_RIF_MAX || max < 1)
2190 /* cannot exceed or equal max_rpcs_in_flight */
2191 if (max >= cli->cl_max_rpcs_in_flight) {
2192 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2193 "higher or equal to max_rpcs_in_flight value (%u)\n",
2194 cli->cl_import->imp_obd->obd_name,
2195 max, cli->cl_max_rpcs_in_flight);
2199 /* cannot exceed max modify RPCs in flight supported by the server */
2200 ocd = &cli->cl_import->imp_connect_data;
2201 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2202 maxmodrpcs = ocd->ocd_maxmodrpcs;
2205 if (max > maxmodrpcs) {
2206 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2207 "higher than max_mod_rpcs_per_client value (%hu) "
2208 "returned by the server at connection\n",
2209 cli->cl_import->imp_obd->obd_name,
2214 spin_lock(&cli->cl_mod_rpcs_lock);
2216 prev = cli->cl_max_mod_rpcs_in_flight;
2217 cli->cl_max_mod_rpcs_in_flight = max;
2219 /* wakeup waiters if limit has been increased */
2220 if (cli->cl_max_mod_rpcs_in_flight > prev)
2221 wake_up(&cli->cl_mod_rpcs_waitq);
2223 spin_unlock(&cli->cl_mod_rpcs_lock);
2227 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2229 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2230 struct seq_file *seq)
2232 unsigned long mod_tot = 0, mod_cum;
2233 struct timespec64 now;
2236 ktime_get_real_ts64(&now);
2238 spin_lock(&cli->cl_mod_rpcs_lock);
2240 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2241 (s64)now.tv_sec, now.tv_nsec);
2242 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2243 cli->cl_mod_rpcs_in_flight);
2245 seq_printf(seq, "\n\t\t\tmodify\n");
2246 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2248 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2251 for (i = 0; i < OBD_HIST_MAX; i++) {
2252 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2254 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2255 i, mod, pct(mod, mod_tot),
2256 pct(mod_cum, mod_tot));
2257 if (mod_cum == mod_tot)
2261 spin_unlock(&cli->cl_mod_rpcs_lock);
2265 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2267 /* The number of modify RPCs sent in parallel is limited
2268 * because the server has a finite number of slots per client to
2269 * store request result and ensure reply reconstruction when needed.
2270 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2271 * that takes into account server limit and cl_max_rpcs_in_flight
2273 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2274 * one close request is allowed above the maximum.
2276 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2281 /* A slot is available if
2282 * - number of modify RPCs in flight is less than the max
2283 * - it's a close RPC and no other close request is in flight
2285 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2286 (close_req && cli->cl_close_rpcs_in_flight == 0);
2291 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2296 spin_lock(&cli->cl_mod_rpcs_lock);
2297 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2298 spin_unlock(&cli->cl_mod_rpcs_lock);
2302 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2305 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2306 it->it_op == IT_READDIR ||
2307 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2312 /* Get a modify RPC slot from the obd client @cli according
2313 * to the kind of operation @opc that is going to be sent
2314 * and the intent @it of the operation if it applies.
2315 * If the maximum number of modify RPCs in flight is reached
2316 * the thread is put to sleep.
2317 * Returns the tag to be set in the request message. Tag 0
2318 * is reserved for non-modifying requests.
2320 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2321 struct lookup_intent *it)
2323 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2324 bool close_req = false;
2327 /* read-only metadata RPCs don't consume a slot on MDT
2328 * for reply reconstruction
2330 if (obd_skip_mod_rpc_slot(it))
2333 if (opc == MDS_CLOSE)
2337 spin_lock(&cli->cl_mod_rpcs_lock);
2338 max = cli->cl_max_mod_rpcs_in_flight;
2339 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2340 /* there is a slot available */
2341 cli->cl_mod_rpcs_in_flight++;
2343 cli->cl_close_rpcs_in_flight++;
2344 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2345 cli->cl_mod_rpcs_in_flight);
2346 /* find a free tag */
2347 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2349 LASSERT(i < OBD_MAX_RIF_MAX);
2350 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2351 spin_unlock(&cli->cl_mod_rpcs_lock);
2352 /* tag 0 is reserved for non-modify RPCs */
2355 spin_unlock(&cli->cl_mod_rpcs_lock);
2357 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2358 "opc %u, max %hu\n",
2359 cli->cl_import->imp_obd->obd_name, opc, max);
2361 l_wait_event_exclusive(cli->cl_mod_rpcs_waitq,
2362 obd_mod_rpc_slot_avail(cli, close_req),
2366 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2368 /* Put a modify RPC slot from the obd client @cli according
2369 * to the kind of operation @opc that has been sent and the
2370 * intent @it of the operation if it applies.
2372 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2373 struct lookup_intent *it, __u16 tag)
2375 bool close_req = false;
2377 if (obd_skip_mod_rpc_slot(it))
2380 if (opc == MDS_CLOSE)
2383 spin_lock(&cli->cl_mod_rpcs_lock);
2384 cli->cl_mod_rpcs_in_flight--;
2386 cli->cl_close_rpcs_in_flight--;
2387 /* release the tag in the bitmap */
2388 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2389 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2390 spin_unlock(&cli->cl_mod_rpcs_lock);
2391 wake_up(&cli->cl_mod_rpcs_waitq);
2393 EXPORT_SYMBOL(obd_put_mod_rpc_slot);