4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
54 static struct kmem_cache *obd_device_cachep;
56 static struct workqueue_struct *zombie_wq;
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61 const char *status, int locks, int debug_level);
63 static LIST_HEAD(obd_stale_exports);
64 static DEFINE_SPINLOCK(obd_stale_export_lock);
65 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
101 struct list_head *tmp;
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 list_for_each(tmp, &obd_types) {
106 type = list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
152 void class_put_type(struct obd_type *type)
155 spin_lock(&type->obd_type_lock);
157 module_put(type->typ_dt_ops->o_owner);
158 spin_unlock(&type->obd_type_lock);
161 static void class_sysfs_release(struct kobject *kobj)
163 OBD_FREE(kobj, sizeof(*kobj));
166 static struct kobj_type class_ktype = {
167 .sysfs_ops = &lustre_sysfs_ops,
168 .release = class_sysfs_release,
171 struct kobject *class_setup_tunables(const char *name)
173 struct kobject *kobj;
176 #ifdef HAVE_SERVER_SUPPORT
177 kobj = kset_find_obj(lustre_kset, name);
181 OBD_ALLOC(kobj, sizeof(*kobj));
183 return ERR_PTR(-ENOMEM);
185 kobj->kset = lustre_kset;
186 kobject_init(kobj, &class_ktype);
187 rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
194 EXPORT_SYMBOL(class_setup_tunables);
196 #define CLASS_MAX_NAME 1024
198 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
199 bool enable_proc, struct lprocfs_vars *vars,
200 const char *name, struct lu_device_type *ldt)
202 struct obd_type *type;
203 #ifdef HAVE_SERVER_SUPPORT
205 #endif /* HAVE_SERVER_SUPPORT */
210 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
212 if (class_search_type(name)) {
213 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
218 OBD_ALLOC(type, sizeof(*type));
222 OBD_ALLOC_PTR(type->typ_dt_ops);
223 OBD_ALLOC_PTR(type->typ_md_ops);
224 OBD_ALLOC(type->typ_name, strlen(name) + 1);
226 if (type->typ_dt_ops == NULL ||
227 type->typ_md_ops == NULL ||
228 type->typ_name == NULL)
231 *(type->typ_dt_ops) = *dt_ops;
232 /* md_ops is optional */
234 *(type->typ_md_ops) = *md_ops;
235 strcpy(type->typ_name, name);
236 spin_lock_init(&type->obd_type_lock);
238 #ifdef CONFIG_PROC_FS
240 type->typ_procroot = lprocfs_register(type->typ_name,
243 if (IS_ERR(type->typ_procroot)) {
244 rc = PTR_ERR(type->typ_procroot);
245 type->typ_procroot = NULL;
250 #ifdef HAVE_SERVER_SUPPORT
252 dname.len = strlen(dname.name);
253 dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
255 type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
256 if (type->typ_debugfs_entry) {
257 dput(type->typ_debugfs_entry);
258 type->typ_sym_filter = true;
261 #endif /* HAVE_SERVER_SUPPORT */
263 type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
266 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
267 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
269 type->typ_debugfs_entry = NULL;
272 #ifdef HAVE_SERVER_SUPPORT
275 type->typ_kobj = class_setup_tunables(type->typ_name);
276 if (IS_ERR(type->typ_kobj))
277 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
281 rc = lu_device_type_init(ldt);
283 kobject_put(type->typ_kobj);
288 spin_lock(&obd_types_lock);
289 list_add(&type->typ_chain, &obd_types);
290 spin_unlock(&obd_types_lock);
295 #ifdef HAVE_SERVER_SUPPORT
296 if (type->typ_sym_filter)
297 type->typ_debugfs_entry = NULL;
299 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
300 ldebugfs_remove(&type->typ_debugfs_entry);
301 if (type->typ_name != NULL) {
302 #ifdef CONFIG_PROC_FS
303 if (type->typ_procroot != NULL)
304 remove_proc_subtree(type->typ_name, proc_lustre_root);
306 OBD_FREE(type->typ_name, strlen(name) + 1);
308 if (type->typ_md_ops != NULL)
309 OBD_FREE_PTR(type->typ_md_ops);
310 if (type->typ_dt_ops != NULL)
311 OBD_FREE_PTR(type->typ_dt_ops);
312 OBD_FREE(type, sizeof(*type));
315 EXPORT_SYMBOL(class_register_type);
317 int class_unregister_type(const char *name)
319 struct obd_type *type = class_search_type(name);
323 CERROR("unknown obd type\n");
327 if (type->typ_refcnt) {
328 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
329 /* This is a bad situation, let's make the best of it */
330 /* Remove ops, but leave the name for debugging */
331 OBD_FREE_PTR(type->typ_dt_ops);
332 OBD_FREE_PTR(type->typ_md_ops);
336 kobject_put(type->typ_kobj);
338 /* we do not use type->typ_procroot as for compatibility purposes
339 * other modules can share names (i.e. lod can use lov entry). so
340 * we can't reference pointer as it can get invalided when another
341 * module removes the entry */
342 #ifdef CONFIG_PROC_FS
343 if (type->typ_procroot != NULL)
344 remove_proc_subtree(type->typ_name, proc_lustre_root);
345 if (type->typ_procsym != NULL)
346 lprocfs_remove(&type->typ_procsym);
348 #ifdef HAVE_SERVER_SUPPORT
349 if (type->typ_sym_filter)
350 type->typ_debugfs_entry = NULL;
352 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
353 ldebugfs_remove(&type->typ_debugfs_entry);
356 lu_device_type_fini(type->typ_lu);
358 spin_lock(&obd_types_lock);
359 list_del(&type->typ_chain);
360 spin_unlock(&obd_types_lock);
361 OBD_FREE(type->typ_name, strlen(name) + 1);
362 if (type->typ_dt_ops != NULL)
363 OBD_FREE_PTR(type->typ_dt_ops);
364 if (type->typ_md_ops != NULL)
365 OBD_FREE_PTR(type->typ_md_ops);
366 OBD_FREE(type, sizeof(*type));
368 } /* class_unregister_type */
369 EXPORT_SYMBOL(class_unregister_type);
372 * Create a new obd device.
374 * Allocate the new obd_device and initialize it.
376 * \param[in] type_name obd device type string.
377 * \param[in] name obd device name.
378 * \param[in] uuid obd device UUID
380 * \retval newdev pointer to created obd_device
381 * \retval ERR_PTR(errno) on error
383 struct obd_device *class_newdev(const char *type_name, const char *name,
386 struct obd_device *newdev;
387 struct obd_type *type = NULL;
390 if (strlen(name) >= MAX_OBD_NAME) {
391 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
392 RETURN(ERR_PTR(-EINVAL));
395 type = class_get_type(type_name);
397 CERROR("OBD: unknown type: %s\n", type_name);
398 RETURN(ERR_PTR(-ENODEV));
401 newdev = obd_device_alloc();
402 if (newdev == NULL) {
403 class_put_type(type);
404 RETURN(ERR_PTR(-ENOMEM));
406 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
407 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
408 newdev->obd_type = type;
409 newdev->obd_minor = -1;
411 rwlock_init(&newdev->obd_pool_lock);
412 newdev->obd_pool_limit = 0;
413 newdev->obd_pool_slv = 0;
415 INIT_LIST_HEAD(&newdev->obd_exports);
416 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
417 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
418 INIT_LIST_HEAD(&newdev->obd_exports_timed);
419 INIT_LIST_HEAD(&newdev->obd_nid_stats);
420 spin_lock_init(&newdev->obd_nid_lock);
421 spin_lock_init(&newdev->obd_dev_lock);
422 mutex_init(&newdev->obd_dev_mutex);
423 spin_lock_init(&newdev->obd_osfs_lock);
424 /* newdev->obd_osfs_age must be set to a value in the distant
425 * past to guarantee a fresh statfs is fetched on mount. */
426 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
428 /* XXX belongs in setup not attach */
429 init_rwsem(&newdev->obd_observer_link_sem);
431 spin_lock_init(&newdev->obd_recovery_task_lock);
432 init_waitqueue_head(&newdev->obd_next_transno_waitq);
433 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
434 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
435 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
436 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
437 INIT_LIST_HEAD(&newdev->obd_evict_list);
438 INIT_LIST_HEAD(&newdev->obd_lwp_list);
440 llog_group_init(&newdev->obd_olg);
441 /* Detach drops this */
442 atomic_set(&newdev->obd_refcount, 1);
443 lu_ref_init(&newdev->obd_reference);
444 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
446 newdev->obd_conn_inprogress = 0;
448 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
450 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
451 newdev->obd_name, newdev);
459 * \param[in] obd obd_device to be freed
463 void class_free_dev(struct obd_device *obd)
465 struct obd_type *obd_type = obd->obd_type;
467 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
468 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
469 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
470 "obd %p != obd_devs[%d] %p\n",
471 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
472 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
473 "obd_refcount should be 0, not %d\n",
474 atomic_read(&obd->obd_refcount));
475 LASSERT(obd_type != NULL);
477 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
478 obd->obd_name, obd->obd_type->typ_name);
480 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
481 obd->obd_name, obd->obd_uuid.uuid);
482 if (obd->obd_stopping) {
485 /* If we're not stopping, we were never set up */
486 err = obd_cleanup(obd);
488 CERROR("Cleanup %s returned %d\n",
492 obd_device_free(obd);
494 class_put_type(obd_type);
498 * Unregister obd device.
500 * Free slot in obd_dev[] used by \a obd.
502 * \param[in] new_obd obd_device to be unregistered
506 void class_unregister_device(struct obd_device *obd)
508 write_lock(&obd_dev_lock);
509 if (obd->obd_minor >= 0) {
510 LASSERT(obd_devs[obd->obd_minor] == obd);
511 obd_devs[obd->obd_minor] = NULL;
514 write_unlock(&obd_dev_lock);
518 * Register obd device.
520 * Find free slot in obd_devs[], fills it with \a new_obd.
522 * \param[in] new_obd obd_device to be registered
525 * \retval -EEXIST device with this name is registered
526 * \retval -EOVERFLOW obd_devs[] is full
528 int class_register_device(struct obd_device *new_obd)
532 int new_obd_minor = 0;
533 bool minor_assign = false;
534 bool retried = false;
537 write_lock(&obd_dev_lock);
538 for (i = 0; i < class_devno_max(); i++) {
539 struct obd_device *obd = class_num2obd(i);
542 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
545 write_unlock(&obd_dev_lock);
547 /* the obd_device could be waited to be
548 * destroyed by the "obd_zombie_impexp_thread".
550 obd_zombie_barrier();
555 CERROR("%s: already exists, won't add\n",
557 /* in case we found a free slot before duplicate */
558 minor_assign = false;
562 if (!minor_assign && obd == NULL) {
569 new_obd->obd_minor = new_obd_minor;
570 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
571 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
572 obd_devs[new_obd_minor] = new_obd;
576 CERROR("%s: all %u/%u devices used, increase "
577 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
578 i, class_devno_max(), ret);
581 write_unlock(&obd_dev_lock);
586 static int class_name2dev_nolock(const char *name)
593 for (i = 0; i < class_devno_max(); i++) {
594 struct obd_device *obd = class_num2obd(i);
596 if (obd && strcmp(name, obd->obd_name) == 0) {
597 /* Make sure we finished attaching before we give
598 out any references */
599 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
600 if (obd->obd_attached) {
610 int class_name2dev(const char *name)
617 read_lock(&obd_dev_lock);
618 i = class_name2dev_nolock(name);
619 read_unlock(&obd_dev_lock);
623 EXPORT_SYMBOL(class_name2dev);
625 struct obd_device *class_name2obd(const char *name)
627 int dev = class_name2dev(name);
629 if (dev < 0 || dev > class_devno_max())
631 return class_num2obd(dev);
633 EXPORT_SYMBOL(class_name2obd);
635 int class_uuid2dev_nolock(struct obd_uuid *uuid)
639 for (i = 0; i < class_devno_max(); i++) {
640 struct obd_device *obd = class_num2obd(i);
642 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
643 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
651 int class_uuid2dev(struct obd_uuid *uuid)
655 read_lock(&obd_dev_lock);
656 i = class_uuid2dev_nolock(uuid);
657 read_unlock(&obd_dev_lock);
661 EXPORT_SYMBOL(class_uuid2dev);
663 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
665 int dev = class_uuid2dev(uuid);
668 return class_num2obd(dev);
670 EXPORT_SYMBOL(class_uuid2obd);
673 * Get obd device from ::obd_devs[]
675 * \param num [in] array index
677 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
678 * otherwise return the obd device there.
680 struct obd_device *class_num2obd(int num)
682 struct obd_device *obd = NULL;
684 if (num < class_devno_max()) {
689 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
690 "%p obd_magic %08x != %08x\n",
691 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
692 LASSERTF(obd->obd_minor == num,
693 "%p obd_minor %0d != %0d\n",
694 obd, obd->obd_minor, num);
701 * Find obd in obd_dev[] by name or uuid.
703 * Increment obd's refcount if found.
705 * \param[in] str obd name or uuid
707 * \retval NULL if not found
708 * \retval target pointer to found obd_device
710 struct obd_device *class_dev_by_str(const char *str)
712 struct obd_device *target = NULL;
713 struct obd_uuid tgtuuid;
716 obd_str2uuid(&tgtuuid, str);
718 read_lock(&obd_dev_lock);
719 rc = class_uuid2dev_nolock(&tgtuuid);
721 rc = class_name2dev_nolock(str);
724 target = class_num2obd(rc);
727 class_incref(target, "find", current);
728 read_unlock(&obd_dev_lock);
732 EXPORT_SYMBOL(class_dev_by_str);
735 * Get obd devices count. Device in any
737 * \retval obd device count
739 int get_devices_count(void)
741 int index, max_index = class_devno_max(), dev_count = 0;
743 read_lock(&obd_dev_lock);
744 for (index = 0; index <= max_index; index++) {
745 struct obd_device *obd = class_num2obd(index);
749 read_unlock(&obd_dev_lock);
753 EXPORT_SYMBOL(get_devices_count);
755 void class_obd_list(void)
760 read_lock(&obd_dev_lock);
761 for (i = 0; i < class_devno_max(); i++) {
762 struct obd_device *obd = class_num2obd(i);
766 if (obd->obd_stopping)
768 else if (obd->obd_set_up)
770 else if (obd->obd_attached)
774 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
775 i, status, obd->obd_type->typ_name,
776 obd->obd_name, obd->obd_uuid.uuid,
777 atomic_read(&obd->obd_refcount));
779 read_unlock(&obd_dev_lock);
783 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
784 specified, then only the client with that uuid is returned,
785 otherwise any client connected to the tgt is returned. */
786 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
787 const char * typ_name,
788 struct obd_uuid *grp_uuid)
792 read_lock(&obd_dev_lock);
793 for (i = 0; i < class_devno_max(); i++) {
794 struct obd_device *obd = class_num2obd(i);
798 if ((strncmp(obd->obd_type->typ_name, typ_name,
799 strlen(typ_name)) == 0)) {
800 if (obd_uuid_equals(tgt_uuid,
801 &obd->u.cli.cl_target_uuid) &&
802 ((grp_uuid)? obd_uuid_equals(grp_uuid,
803 &obd->obd_uuid) : 1)) {
804 read_unlock(&obd_dev_lock);
809 read_unlock(&obd_dev_lock);
813 EXPORT_SYMBOL(class_find_client_obd);
815 /* Iterate the obd_device list looking devices have grp_uuid. Start
816 searching at *next, and if a device is found, the next index to look
817 at is saved in *next. If next is NULL, then the first matching device
818 will always be returned. */
819 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
825 else if (*next >= 0 && *next < class_devno_max())
830 read_lock(&obd_dev_lock);
831 for (; i < class_devno_max(); i++) {
832 struct obd_device *obd = class_num2obd(i);
836 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
839 read_unlock(&obd_dev_lock);
843 read_unlock(&obd_dev_lock);
847 EXPORT_SYMBOL(class_devices_in_group);
850 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
851 * adjust sptlrpc settings accordingly.
853 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
855 struct obd_device *obd;
859 LASSERT(namelen > 0);
861 read_lock(&obd_dev_lock);
862 for (i = 0; i < class_devno_max(); i++) {
863 obd = class_num2obd(i);
865 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
868 /* only notify mdc, osc, osp, lwp, mdt, ost
869 * because only these have a -sptlrpc llog */
870 type = obd->obd_type->typ_name;
871 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
872 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
873 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
874 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
875 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
876 strcmp(type, LUSTRE_OST_NAME) != 0)
879 if (strncmp(obd->obd_name, fsname, namelen))
882 class_incref(obd, __FUNCTION__, obd);
883 read_unlock(&obd_dev_lock);
884 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
885 sizeof(KEY_SPTLRPC_CONF),
886 KEY_SPTLRPC_CONF, 0, NULL, NULL);
888 class_decref(obd, __FUNCTION__, obd);
889 read_lock(&obd_dev_lock);
891 read_unlock(&obd_dev_lock);
894 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
896 void obd_cleanup_caches(void)
899 if (obd_device_cachep) {
900 kmem_cache_destroy(obd_device_cachep);
901 obd_device_cachep = NULL;
907 int obd_init_caches(void)
912 LASSERT(obd_device_cachep == NULL);
913 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
914 sizeof(struct obd_device),
915 0, 0, 0, sizeof(struct obd_device), NULL);
916 if (!obd_device_cachep)
917 GOTO(out, rc = -ENOMEM);
921 obd_cleanup_caches();
925 /* map connection to client */
926 struct obd_export *class_conn2export(struct lustre_handle *conn)
928 struct obd_export *export;
932 CDEBUG(D_CACHE, "looking for null handle\n");
936 if (conn->cookie == -1) { /* this means assign a new connection */
937 CDEBUG(D_CACHE, "want a new connection\n");
941 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
942 export = class_handle2object(conn->cookie, NULL);
945 EXPORT_SYMBOL(class_conn2export);
947 struct obd_device *class_exp2obd(struct obd_export *exp)
953 EXPORT_SYMBOL(class_exp2obd);
955 struct obd_import *class_exp2cliimp(struct obd_export *exp)
957 struct obd_device *obd = exp->exp_obd;
960 return obd->u.cli.cl_import;
962 EXPORT_SYMBOL(class_exp2cliimp);
964 /* Export management functions */
965 static void class_export_destroy(struct obd_export *exp)
967 struct obd_device *obd = exp->exp_obd;
970 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
971 LASSERT(obd != NULL);
973 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
974 exp->exp_client_uuid.uuid, obd->obd_name);
976 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
977 if (exp->exp_connection)
978 ptlrpc_put_connection_superhack(exp->exp_connection);
980 LASSERT(list_empty(&exp->exp_outstanding_replies));
981 LASSERT(list_empty(&exp->exp_uncommitted_replies));
982 LASSERT(list_empty(&exp->exp_req_replay_queue));
983 LASSERT(list_empty(&exp->exp_hp_rpcs));
984 obd_destroy_export(exp);
985 /* self export doesn't hold a reference to an obd, although it
986 * exists until freeing of the obd */
987 if (exp != obd->obd_self_export)
988 class_decref(obd, "export", exp);
990 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
994 static void export_handle_addref(void *export)
996 class_export_get(export);
999 static struct portals_handle_ops export_handle_ops = {
1000 .hop_addref = export_handle_addref,
1004 struct obd_export *class_export_get(struct obd_export *exp)
1006 atomic_inc(&exp->exp_refcount);
1007 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1008 atomic_read(&exp->exp_refcount));
1011 EXPORT_SYMBOL(class_export_get);
1013 void class_export_put(struct obd_export *exp)
1015 LASSERT(exp != NULL);
1016 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1017 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1018 atomic_read(&exp->exp_refcount) - 1);
1020 if (atomic_dec_and_test(&exp->exp_refcount)) {
1021 struct obd_device *obd = exp->exp_obd;
1023 CDEBUG(D_IOCTL, "final put %p/%s\n",
1024 exp, exp->exp_client_uuid.uuid);
1026 /* release nid stat refererence */
1027 lprocfs_exp_cleanup(exp);
1029 if (exp == obd->obd_self_export) {
1030 /* self export should be destroyed without
1031 * zombie thread as it doesn't hold a
1032 * reference to obd and doesn't hold any
1034 class_export_destroy(exp);
1035 /* self export is destroyed, no class
1036 * references exist and it is safe to free
1038 class_free_dev(obd);
1040 LASSERT(!list_empty(&exp->exp_obd_chain));
1041 obd_zombie_export_add(exp);
1046 EXPORT_SYMBOL(class_export_put);
1048 static void obd_zombie_exp_cull(struct work_struct *ws)
1050 struct obd_export *export;
1052 export = container_of(ws, struct obd_export, exp_zombie_work);
1053 class_export_destroy(export);
1056 /* Creates a new export, adds it to the hash table, and returns a
1057 * pointer to it. The refcount is 2: one for the hash reference, and
1058 * one for the pointer returned by this function. */
1059 struct obd_export *__class_new_export(struct obd_device *obd,
1060 struct obd_uuid *cluuid, bool is_self)
1062 struct obd_export *export;
1063 struct cfs_hash *hash = NULL;
1067 OBD_ALLOC_PTR(export);
1069 return ERR_PTR(-ENOMEM);
1071 export->exp_conn_cnt = 0;
1072 export->exp_lock_hash = NULL;
1073 export->exp_flock_hash = NULL;
1074 /* 2 = class_handle_hash + last */
1075 atomic_set(&export->exp_refcount, 2);
1076 atomic_set(&export->exp_rpc_count, 0);
1077 atomic_set(&export->exp_cb_count, 0);
1078 atomic_set(&export->exp_locks_count, 0);
1079 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1080 INIT_LIST_HEAD(&export->exp_locks_list);
1081 spin_lock_init(&export->exp_locks_list_guard);
1083 atomic_set(&export->exp_replay_count, 0);
1084 export->exp_obd = obd;
1085 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1086 spin_lock_init(&export->exp_uncommitted_replies_lock);
1087 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1088 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1089 INIT_LIST_HEAD_RCU(&export->exp_handle.h_link);
1090 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1091 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1092 class_handle_hash(&export->exp_handle, &export_handle_ops);
1093 export->exp_last_request_time = ktime_get_real_seconds();
1094 spin_lock_init(&export->exp_lock);
1095 spin_lock_init(&export->exp_rpc_lock);
1096 INIT_HLIST_NODE(&export->exp_uuid_hash);
1097 INIT_HLIST_NODE(&export->exp_nid_hash);
1098 INIT_HLIST_NODE(&export->exp_gen_hash);
1099 spin_lock_init(&export->exp_bl_list_lock);
1100 INIT_LIST_HEAD(&export->exp_bl_list);
1101 INIT_LIST_HEAD(&export->exp_stale_list);
1102 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1104 export->exp_sp_peer = LUSTRE_SP_ANY;
1105 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1106 export->exp_client_uuid = *cluuid;
1107 obd_init_export(export);
1109 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1110 spin_lock(&obd->obd_dev_lock);
1111 /* shouldn't happen, but might race */
1112 if (obd->obd_stopping)
1113 GOTO(exit_unlock, rc = -ENODEV);
1115 hash = cfs_hash_getref(obd->obd_uuid_hash);
1117 GOTO(exit_unlock, rc = -ENODEV);
1118 spin_unlock(&obd->obd_dev_lock);
1120 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1122 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1123 obd->obd_name, cluuid->uuid, rc);
1124 GOTO(exit_err, rc = -EALREADY);
1128 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1129 spin_lock(&obd->obd_dev_lock);
1130 if (obd->obd_stopping) {
1132 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1133 GOTO(exit_unlock, rc = -ESHUTDOWN);
1137 class_incref(obd, "export", export);
1138 list_add_tail(&export->exp_obd_chain_timed,
1139 &obd->obd_exports_timed);
1140 list_add(&export->exp_obd_chain, &obd->obd_exports);
1141 obd->obd_num_exports++;
1143 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1144 INIT_LIST_HEAD(&export->exp_obd_chain);
1146 spin_unlock(&obd->obd_dev_lock);
1148 cfs_hash_putref(hash);
1152 spin_unlock(&obd->obd_dev_lock);
1155 cfs_hash_putref(hash);
1156 class_handle_unhash(&export->exp_handle);
1157 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1158 obd_destroy_export(export);
1159 OBD_FREE_PTR(export);
1163 struct obd_export *class_new_export(struct obd_device *obd,
1164 struct obd_uuid *uuid)
1166 return __class_new_export(obd, uuid, false);
1168 EXPORT_SYMBOL(class_new_export);
1170 struct obd_export *class_new_export_self(struct obd_device *obd,
1171 struct obd_uuid *uuid)
1173 return __class_new_export(obd, uuid, true);
1176 void class_unlink_export(struct obd_export *exp)
1178 class_handle_unhash(&exp->exp_handle);
1180 if (exp->exp_obd->obd_self_export == exp) {
1181 class_export_put(exp);
1185 spin_lock(&exp->exp_obd->obd_dev_lock);
1186 /* delete an uuid-export hashitem from hashtables */
1187 if (!hlist_unhashed(&exp->exp_uuid_hash))
1188 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1189 &exp->exp_client_uuid,
1190 &exp->exp_uuid_hash);
1192 #ifdef HAVE_SERVER_SUPPORT
1193 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1194 struct tg_export_data *ted = &exp->exp_target_data;
1195 struct cfs_hash *hash;
1197 /* Because obd_gen_hash will not be released until
1198 * class_cleanup(), so hash should never be NULL here */
1199 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1200 LASSERT(hash != NULL);
1201 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1202 &exp->exp_gen_hash);
1203 cfs_hash_putref(hash);
1205 #endif /* HAVE_SERVER_SUPPORT */
1207 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1208 list_del_init(&exp->exp_obd_chain_timed);
1209 exp->exp_obd->obd_num_exports--;
1210 spin_unlock(&exp->exp_obd->obd_dev_lock);
1211 atomic_inc(&obd_stale_export_num);
1213 /* A reference is kept by obd_stale_exports list */
1214 obd_stale_export_put(exp);
1216 EXPORT_SYMBOL(class_unlink_export);
1218 /* Import management functions */
1219 static void obd_zombie_import_free(struct obd_import *imp)
1223 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1224 imp->imp_obd->obd_name);
1226 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1228 ptlrpc_put_connection_superhack(imp->imp_connection);
1230 while (!list_empty(&imp->imp_conn_list)) {
1231 struct obd_import_conn *imp_conn;
1233 imp_conn = list_entry(imp->imp_conn_list.next,
1234 struct obd_import_conn, oic_item);
1235 list_del_init(&imp_conn->oic_item);
1236 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1237 OBD_FREE(imp_conn, sizeof(*imp_conn));
1240 LASSERT(imp->imp_sec == NULL);
1241 class_decref(imp->imp_obd, "import", imp);
1246 struct obd_import *class_import_get(struct obd_import *import)
1248 atomic_inc(&import->imp_refcount);
1249 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1250 atomic_read(&import->imp_refcount),
1251 import->imp_obd->obd_name);
1254 EXPORT_SYMBOL(class_import_get);
1256 void class_import_put(struct obd_import *imp)
1260 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1262 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1263 atomic_read(&imp->imp_refcount) - 1,
1264 imp->imp_obd->obd_name);
1266 if (atomic_dec_and_test(&imp->imp_refcount)) {
1267 CDEBUG(D_INFO, "final put import %p\n", imp);
1268 obd_zombie_import_add(imp);
1273 EXPORT_SYMBOL(class_import_put);
1275 static void init_imp_at(struct imp_at *at) {
1277 at_init(&at->iat_net_latency, 0, 0);
1278 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1279 /* max service estimates are tracked on the server side, so
1280 don't use the AT history here, just use the last reported
1281 val. (But keep hist for proc histogram, worst_ever) */
1282 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1287 static void obd_zombie_imp_cull(struct work_struct *ws)
1289 struct obd_import *import;
1291 import = container_of(ws, struct obd_import, imp_zombie_work);
1292 obd_zombie_import_free(import);
1295 struct obd_import *class_new_import(struct obd_device *obd)
1297 struct obd_import *imp;
1298 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1300 OBD_ALLOC(imp, sizeof(*imp));
1304 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1305 INIT_LIST_HEAD(&imp->imp_replay_list);
1306 INIT_LIST_HEAD(&imp->imp_sending_list);
1307 INIT_LIST_HEAD(&imp->imp_delayed_list);
1308 INIT_LIST_HEAD(&imp->imp_committed_list);
1309 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1310 imp->imp_known_replied_xid = 0;
1311 imp->imp_replay_cursor = &imp->imp_committed_list;
1312 spin_lock_init(&imp->imp_lock);
1313 imp->imp_last_success_conn = 0;
1314 imp->imp_state = LUSTRE_IMP_NEW;
1315 imp->imp_obd = class_incref(obd, "import", imp);
1316 mutex_init(&imp->imp_sec_mutex);
1317 init_waitqueue_head(&imp->imp_recovery_waitq);
1318 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1320 if (curr_pid_ns && curr_pid_ns->child_reaper)
1321 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1323 imp->imp_sec_refpid = 1;
1325 atomic_set(&imp->imp_refcount, 2);
1326 atomic_set(&imp->imp_unregistering, 0);
1327 atomic_set(&imp->imp_inflight, 0);
1328 atomic_set(&imp->imp_replay_inflight, 0);
1329 atomic_set(&imp->imp_inval_count, 0);
1330 INIT_LIST_HEAD(&imp->imp_conn_list);
1331 init_imp_at(&imp->imp_at);
1333 /* the default magic is V2, will be used in connect RPC, and
1334 * then adjusted according to the flags in request/reply. */
1335 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1339 EXPORT_SYMBOL(class_new_import);
1341 void class_destroy_import(struct obd_import *import)
1343 LASSERT(import != NULL);
1344 LASSERT(import != LP_POISON);
1346 spin_lock(&import->imp_lock);
1347 import->imp_generation++;
1348 spin_unlock(&import->imp_lock);
1349 class_import_put(import);
1351 EXPORT_SYMBOL(class_destroy_import);
1353 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1355 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1357 spin_lock(&exp->exp_locks_list_guard);
1359 LASSERT(lock->l_exp_refs_nr >= 0);
1361 if (lock->l_exp_refs_target != NULL &&
1362 lock->l_exp_refs_target != exp) {
1363 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1364 exp, lock, lock->l_exp_refs_target);
1366 if ((lock->l_exp_refs_nr ++) == 0) {
1367 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1368 lock->l_exp_refs_target = exp;
1370 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1371 lock, exp, lock->l_exp_refs_nr);
1372 spin_unlock(&exp->exp_locks_list_guard);
1374 EXPORT_SYMBOL(__class_export_add_lock_ref);
1376 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1378 spin_lock(&exp->exp_locks_list_guard);
1379 LASSERT(lock->l_exp_refs_nr > 0);
1380 if (lock->l_exp_refs_target != exp) {
1381 LCONSOLE_WARN("lock %p, "
1382 "mismatching export pointers: %p, %p\n",
1383 lock, lock->l_exp_refs_target, exp);
1385 if (-- lock->l_exp_refs_nr == 0) {
1386 list_del_init(&lock->l_exp_refs_link);
1387 lock->l_exp_refs_target = NULL;
1389 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1390 lock, exp, lock->l_exp_refs_nr);
1391 spin_unlock(&exp->exp_locks_list_guard);
1393 EXPORT_SYMBOL(__class_export_del_lock_ref);
1396 /* A connection defines an export context in which preallocation can
1397 be managed. This releases the export pointer reference, and returns
1398 the export handle, so the export refcount is 1 when this function
1400 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1401 struct obd_uuid *cluuid)
1403 struct obd_export *export;
1404 LASSERT(conn != NULL);
1405 LASSERT(obd != NULL);
1406 LASSERT(cluuid != NULL);
1409 export = class_new_export(obd, cluuid);
1411 RETURN(PTR_ERR(export));
1413 conn->cookie = export->exp_handle.h_cookie;
1414 class_export_put(export);
1416 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1417 cluuid->uuid, conn->cookie);
1420 EXPORT_SYMBOL(class_connect);
1422 /* if export is involved in recovery then clean up related things */
1423 static void class_export_recovery_cleanup(struct obd_export *exp)
1425 struct obd_device *obd = exp->exp_obd;
1427 spin_lock(&obd->obd_recovery_task_lock);
1428 if (obd->obd_recovering) {
1429 if (exp->exp_in_recovery) {
1430 spin_lock(&exp->exp_lock);
1431 exp->exp_in_recovery = 0;
1432 spin_unlock(&exp->exp_lock);
1433 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1434 atomic_dec(&obd->obd_connected_clients);
1437 /* if called during recovery then should update
1438 * obd_stale_clients counter,
1439 * lightweight exports are not counted */
1440 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1441 exp->exp_obd->obd_stale_clients++;
1443 spin_unlock(&obd->obd_recovery_task_lock);
1445 spin_lock(&exp->exp_lock);
1446 /** Cleanup req replay fields */
1447 if (exp->exp_req_replay_needed) {
1448 exp->exp_req_replay_needed = 0;
1450 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1451 atomic_dec(&obd->obd_req_replay_clients);
1454 /** Cleanup lock replay data */
1455 if (exp->exp_lock_replay_needed) {
1456 exp->exp_lock_replay_needed = 0;
1458 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1459 atomic_dec(&obd->obd_lock_replay_clients);
1461 spin_unlock(&exp->exp_lock);
1464 /* This function removes 1-3 references from the export:
1465 * 1 - for export pointer passed
1466 * and if disconnect really need
1467 * 2 - removing from hash
1468 * 3 - in client_unlink_export
1469 * The export pointer passed to this function can destroyed */
1470 int class_disconnect(struct obd_export *export)
1472 int already_disconnected;
1475 if (export == NULL) {
1476 CWARN("attempting to free NULL export %p\n", export);
1480 spin_lock(&export->exp_lock);
1481 already_disconnected = export->exp_disconnected;
1482 export->exp_disconnected = 1;
1483 /* We hold references of export for uuid hash
1484 * and nid_hash and export link at least. So
1485 * it is safe to call cfs_hash_del in there. */
1486 if (!hlist_unhashed(&export->exp_nid_hash))
1487 cfs_hash_del(export->exp_obd->obd_nid_hash,
1488 &export->exp_connection->c_peer.nid,
1489 &export->exp_nid_hash);
1490 spin_unlock(&export->exp_lock);
1492 /* class_cleanup(), abort_recovery(), and class_fail_export()
1493 * all end up in here, and if any of them race we shouldn't
1494 * call extra class_export_puts(). */
1495 if (already_disconnected) {
1496 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1497 GOTO(no_disconn, already_disconnected);
1500 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1501 export->exp_handle.h_cookie);
1503 class_export_recovery_cleanup(export);
1504 class_unlink_export(export);
1506 class_export_put(export);
1509 EXPORT_SYMBOL(class_disconnect);
1511 /* Return non-zero for a fully connected export */
1512 int class_connected_export(struct obd_export *exp)
1517 spin_lock(&exp->exp_lock);
1518 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1519 spin_unlock(&exp->exp_lock);
1523 EXPORT_SYMBOL(class_connected_export);
1525 static void class_disconnect_export_list(struct list_head *list,
1526 enum obd_option flags)
1529 struct obd_export *exp;
1532 /* It's possible that an export may disconnect itself, but
1533 * nothing else will be added to this list. */
1534 while (!list_empty(list)) {
1535 exp = list_entry(list->next, struct obd_export,
1537 /* need for safe call CDEBUG after obd_disconnect */
1538 class_export_get(exp);
1540 spin_lock(&exp->exp_lock);
1541 exp->exp_flags = flags;
1542 spin_unlock(&exp->exp_lock);
1544 if (obd_uuid_equals(&exp->exp_client_uuid,
1545 &exp->exp_obd->obd_uuid)) {
1547 "exp %p export uuid == obd uuid, don't discon\n",
1549 /* Need to delete this now so we don't end up pointing
1550 * to work_list later when this export is cleaned up. */
1551 list_del_init(&exp->exp_obd_chain);
1552 class_export_put(exp);
1556 class_export_get(exp);
1557 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1558 "last request at %lld\n",
1559 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1560 exp, exp->exp_last_request_time);
1561 /* release one export reference anyway */
1562 rc = obd_disconnect(exp);
1564 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1565 obd_export_nid2str(exp), exp, rc);
1566 class_export_put(exp);
1571 void class_disconnect_exports(struct obd_device *obd)
1573 struct list_head work_list;
1576 /* Move all of the exports from obd_exports to a work list, en masse. */
1577 INIT_LIST_HEAD(&work_list);
1578 spin_lock(&obd->obd_dev_lock);
1579 list_splice_init(&obd->obd_exports, &work_list);
1580 list_splice_init(&obd->obd_delayed_exports, &work_list);
1581 spin_unlock(&obd->obd_dev_lock);
1583 if (!list_empty(&work_list)) {
1584 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1585 "disconnecting them\n", obd->obd_minor, obd);
1586 class_disconnect_export_list(&work_list,
1587 exp_flags_from_obd(obd));
1589 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1590 obd->obd_minor, obd);
1593 EXPORT_SYMBOL(class_disconnect_exports);
1595 /* Remove exports that have not completed recovery.
1597 void class_disconnect_stale_exports(struct obd_device *obd,
1598 int (*test_export)(struct obd_export *))
1600 struct list_head work_list;
1601 struct obd_export *exp, *n;
1605 INIT_LIST_HEAD(&work_list);
1606 spin_lock(&obd->obd_dev_lock);
1607 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1609 /* don't count self-export as client */
1610 if (obd_uuid_equals(&exp->exp_client_uuid,
1611 &exp->exp_obd->obd_uuid))
1614 /* don't evict clients which have no slot in last_rcvd
1615 * (e.g. lightweight connection) */
1616 if (exp->exp_target_data.ted_lr_idx == -1)
1619 spin_lock(&exp->exp_lock);
1620 if (exp->exp_failed || test_export(exp)) {
1621 spin_unlock(&exp->exp_lock);
1624 exp->exp_failed = 1;
1625 spin_unlock(&exp->exp_lock);
1627 list_move(&exp->exp_obd_chain, &work_list);
1629 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1630 obd->obd_name, exp->exp_client_uuid.uuid,
1631 obd_export_nid2str(exp));
1632 print_export_data(exp, "EVICTING", 0, D_HA);
1634 spin_unlock(&obd->obd_dev_lock);
1637 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1638 obd->obd_name, evicted);
1640 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1641 OBD_OPT_ABORT_RECOV);
1644 EXPORT_SYMBOL(class_disconnect_stale_exports);
1646 void class_fail_export(struct obd_export *exp)
1648 int rc, already_failed;
1650 spin_lock(&exp->exp_lock);
1651 already_failed = exp->exp_failed;
1652 exp->exp_failed = 1;
1653 spin_unlock(&exp->exp_lock);
1655 if (already_failed) {
1656 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1657 exp, exp->exp_client_uuid.uuid);
1661 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1662 exp, exp->exp_client_uuid.uuid);
1664 if (obd_dump_on_timeout)
1665 libcfs_debug_dumplog();
1667 /* need for safe call CDEBUG after obd_disconnect */
1668 class_export_get(exp);
1670 /* Most callers into obd_disconnect are removing their own reference
1671 * (request, for example) in addition to the one from the hash table.
1672 * We don't have such a reference here, so make one. */
1673 class_export_get(exp);
1674 rc = obd_disconnect(exp);
1676 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1678 CDEBUG(D_HA, "disconnected export %p/%s\n",
1679 exp, exp->exp_client_uuid.uuid);
1680 class_export_put(exp);
1682 EXPORT_SYMBOL(class_fail_export);
1684 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1686 struct cfs_hash *nid_hash;
1687 struct obd_export *doomed_exp = NULL;
1688 int exports_evicted = 0;
1690 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1692 spin_lock(&obd->obd_dev_lock);
1693 /* umount has run already, so evict thread should leave
1694 * its task to umount thread now */
1695 if (obd->obd_stopping) {
1696 spin_unlock(&obd->obd_dev_lock);
1697 return exports_evicted;
1699 nid_hash = obd->obd_nid_hash;
1700 cfs_hash_getref(nid_hash);
1701 spin_unlock(&obd->obd_dev_lock);
1704 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1705 if (doomed_exp == NULL)
1708 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1709 "nid %s found, wanted nid %s, requested nid %s\n",
1710 obd_export_nid2str(doomed_exp),
1711 libcfs_nid2str(nid_key), nid);
1712 LASSERTF(doomed_exp != obd->obd_self_export,
1713 "self-export is hashed by NID?\n");
1715 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1716 "request\n", obd->obd_name,
1717 obd_uuid2str(&doomed_exp->exp_client_uuid),
1718 obd_export_nid2str(doomed_exp));
1719 class_fail_export(doomed_exp);
1720 class_export_put(doomed_exp);
1723 cfs_hash_putref(nid_hash);
1725 if (!exports_evicted)
1726 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1727 obd->obd_name, nid);
1728 return exports_evicted;
1730 EXPORT_SYMBOL(obd_export_evict_by_nid);
1732 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1734 struct cfs_hash *uuid_hash;
1735 struct obd_export *doomed_exp = NULL;
1736 struct obd_uuid doomed_uuid;
1737 int exports_evicted = 0;
1739 spin_lock(&obd->obd_dev_lock);
1740 if (obd->obd_stopping) {
1741 spin_unlock(&obd->obd_dev_lock);
1742 return exports_evicted;
1744 uuid_hash = obd->obd_uuid_hash;
1745 cfs_hash_getref(uuid_hash);
1746 spin_unlock(&obd->obd_dev_lock);
1748 obd_str2uuid(&doomed_uuid, uuid);
1749 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1750 CERROR("%s: can't evict myself\n", obd->obd_name);
1751 cfs_hash_putref(uuid_hash);
1752 return exports_evicted;
1755 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1757 if (doomed_exp == NULL) {
1758 CERROR("%s: can't disconnect %s: no exports found\n",
1759 obd->obd_name, uuid);
1761 CWARN("%s: evicting %s at adminstrative request\n",
1762 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1763 class_fail_export(doomed_exp);
1764 class_export_put(doomed_exp);
1767 cfs_hash_putref(uuid_hash);
1769 return exports_evicted;
1772 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1773 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1774 EXPORT_SYMBOL(class_export_dump_hook);
1777 static void print_export_data(struct obd_export *exp, const char *status,
1778 int locks, int debug_level)
1780 struct ptlrpc_reply_state *rs;
1781 struct ptlrpc_reply_state *first_reply = NULL;
1784 spin_lock(&exp->exp_lock);
1785 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1791 spin_unlock(&exp->exp_lock);
1793 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1794 "%p %s %llu stale:%d\n",
1795 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1796 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1797 atomic_read(&exp->exp_rpc_count),
1798 atomic_read(&exp->exp_cb_count),
1799 atomic_read(&exp->exp_locks_count),
1800 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1801 nreplies, first_reply, nreplies > 3 ? "..." : "",
1802 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1803 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1804 if (locks && class_export_dump_hook != NULL)
1805 class_export_dump_hook(exp);
1809 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1811 struct obd_export *exp;
1813 spin_lock(&obd->obd_dev_lock);
1814 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1815 print_export_data(exp, "ACTIVE", locks, debug_level);
1816 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1817 print_export_data(exp, "UNLINKED", locks, debug_level);
1818 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1819 print_export_data(exp, "DELAYED", locks, debug_level);
1820 spin_unlock(&obd->obd_dev_lock);
1823 void obd_exports_barrier(struct obd_device *obd)
1826 LASSERT(list_empty(&obd->obd_exports));
1827 spin_lock(&obd->obd_dev_lock);
1828 while (!list_empty(&obd->obd_unlinked_exports)) {
1829 spin_unlock(&obd->obd_dev_lock);
1830 set_current_state(TASK_UNINTERRUPTIBLE);
1831 schedule_timeout(cfs_time_seconds(waited));
1832 if (waited > 5 && is_power_of_2(waited)) {
1833 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1834 "more than %d seconds. "
1835 "The obd refcount = %d. Is it stuck?\n",
1836 obd->obd_name, waited,
1837 atomic_read(&obd->obd_refcount));
1838 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1841 spin_lock(&obd->obd_dev_lock);
1843 spin_unlock(&obd->obd_dev_lock);
1845 EXPORT_SYMBOL(obd_exports_barrier);
1848 * Add export to the obd_zombe thread and notify it.
1850 static void obd_zombie_export_add(struct obd_export *exp) {
1851 atomic_dec(&obd_stale_export_num);
1852 spin_lock(&exp->exp_obd->obd_dev_lock);
1853 LASSERT(!list_empty(&exp->exp_obd_chain));
1854 list_del_init(&exp->exp_obd_chain);
1855 spin_unlock(&exp->exp_obd->obd_dev_lock);
1857 queue_work(zombie_wq, &exp->exp_zombie_work);
1861 * Add import to the obd_zombe thread and notify it.
1863 static void obd_zombie_import_add(struct obd_import *imp) {
1864 LASSERT(imp->imp_sec == NULL);
1866 queue_work(zombie_wq, &imp->imp_zombie_work);
1870 * wait when obd_zombie import/export queues become empty
1872 void obd_zombie_barrier(void)
1874 flush_workqueue(zombie_wq);
1876 EXPORT_SYMBOL(obd_zombie_barrier);
1879 struct obd_export *obd_stale_export_get(void)
1881 struct obd_export *exp = NULL;
1884 spin_lock(&obd_stale_export_lock);
1885 if (!list_empty(&obd_stale_exports)) {
1886 exp = list_entry(obd_stale_exports.next,
1887 struct obd_export, exp_stale_list);
1888 list_del_init(&exp->exp_stale_list);
1890 spin_unlock(&obd_stale_export_lock);
1893 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1894 atomic_read(&obd_stale_export_num));
1898 EXPORT_SYMBOL(obd_stale_export_get);
1900 void obd_stale_export_put(struct obd_export *exp)
1904 LASSERT(list_empty(&exp->exp_stale_list));
1905 if (exp->exp_lock_hash &&
1906 atomic_read(&exp->exp_lock_hash->hs_count)) {
1907 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1908 atomic_read(&obd_stale_export_num));
1910 spin_lock_bh(&exp->exp_bl_list_lock);
1911 spin_lock(&obd_stale_export_lock);
1912 /* Add to the tail if there is no blocked locks,
1913 * to the head otherwise. */
1914 if (list_empty(&exp->exp_bl_list))
1915 list_add_tail(&exp->exp_stale_list,
1916 &obd_stale_exports);
1918 list_add(&exp->exp_stale_list,
1919 &obd_stale_exports);
1921 spin_unlock(&obd_stale_export_lock);
1922 spin_unlock_bh(&exp->exp_bl_list_lock);
1924 class_export_put(exp);
1928 EXPORT_SYMBOL(obd_stale_export_put);
1931 * Adjust the position of the export in the stale list,
1932 * i.e. move to the head of the list if is needed.
1934 void obd_stale_export_adjust(struct obd_export *exp)
1936 LASSERT(exp != NULL);
1937 spin_lock_bh(&exp->exp_bl_list_lock);
1938 spin_lock(&obd_stale_export_lock);
1940 if (!list_empty(&exp->exp_stale_list) &&
1941 !list_empty(&exp->exp_bl_list))
1942 list_move(&exp->exp_stale_list, &obd_stale_exports);
1944 spin_unlock(&obd_stale_export_lock);
1945 spin_unlock_bh(&exp->exp_bl_list_lock);
1947 EXPORT_SYMBOL(obd_stale_export_adjust);
1950 * start destroy zombie import/export thread
1952 int obd_zombie_impexp_init(void)
1954 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
1962 * stop destroy zombie import/export thread
1964 void obd_zombie_impexp_stop(void)
1966 destroy_workqueue(zombie_wq);
1967 LASSERT(list_empty(&obd_stale_exports));
1970 /***** Kernel-userspace comm helpers *******/
1972 /* Get length of entire message, including header */
1973 int kuc_len(int payload_len)
1975 return sizeof(struct kuc_hdr) + payload_len;
1977 EXPORT_SYMBOL(kuc_len);
1979 /* Get a pointer to kuc header, given a ptr to the payload
1980 * @param p Pointer to payload area
1981 * @returns Pointer to kuc header
1983 struct kuc_hdr * kuc_ptr(void *p)
1985 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1986 LASSERT(lh->kuc_magic == KUC_MAGIC);
1989 EXPORT_SYMBOL(kuc_ptr);
1991 /* Alloc space for a message, and fill in header
1992 * @return Pointer to payload area
1994 void *kuc_alloc(int payload_len, int transport, int type)
1997 int len = kuc_len(payload_len);
2001 return ERR_PTR(-ENOMEM);
2003 lh->kuc_magic = KUC_MAGIC;
2004 lh->kuc_transport = transport;
2005 lh->kuc_msgtype = type;
2006 lh->kuc_msglen = len;
2008 return (void *)(lh + 1);
2010 EXPORT_SYMBOL(kuc_alloc);
2012 /* Takes pointer to payload area */
2013 void kuc_free(void *p, int payload_len)
2015 struct kuc_hdr *lh = kuc_ptr(p);
2016 OBD_FREE(lh, kuc_len(payload_len));
2018 EXPORT_SYMBOL(kuc_free);
2020 struct obd_request_slot_waiter {
2021 struct list_head orsw_entry;
2022 wait_queue_head_t orsw_waitq;
2026 static bool obd_request_slot_avail(struct client_obd *cli,
2027 struct obd_request_slot_waiter *orsw)
2031 spin_lock(&cli->cl_loi_list_lock);
2032 avail = !!list_empty(&orsw->orsw_entry);
2033 spin_unlock(&cli->cl_loi_list_lock);
2039 * For network flow control, the RPC sponsor needs to acquire a credit
2040 * before sending the RPC. The credits count for a connection is defined
2041 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2042 * the subsequent RPC sponsors need to wait until others released their
2043 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2045 int obd_get_request_slot(struct client_obd *cli)
2047 struct obd_request_slot_waiter orsw;
2048 struct l_wait_info lwi;
2051 spin_lock(&cli->cl_loi_list_lock);
2052 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2053 cli->cl_rpcs_in_flight++;
2054 spin_unlock(&cli->cl_loi_list_lock);
2058 init_waitqueue_head(&orsw.orsw_waitq);
2059 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2060 orsw.orsw_signaled = false;
2061 spin_unlock(&cli->cl_loi_list_lock);
2063 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2064 rc = l_wait_event(orsw.orsw_waitq,
2065 obd_request_slot_avail(cli, &orsw) ||
2069 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2070 * freed but other (such as obd_put_request_slot) is using it. */
2071 spin_lock(&cli->cl_loi_list_lock);
2073 if (!orsw.orsw_signaled) {
2074 if (list_empty(&orsw.orsw_entry))
2075 cli->cl_rpcs_in_flight--;
2077 list_del(&orsw.orsw_entry);
2081 if (orsw.orsw_signaled) {
2082 LASSERT(list_empty(&orsw.orsw_entry));
2086 spin_unlock(&cli->cl_loi_list_lock);
2090 EXPORT_SYMBOL(obd_get_request_slot);
2092 void obd_put_request_slot(struct client_obd *cli)
2094 struct obd_request_slot_waiter *orsw;
2096 spin_lock(&cli->cl_loi_list_lock);
2097 cli->cl_rpcs_in_flight--;
2099 /* If there is free slot, wakeup the first waiter. */
2100 if (!list_empty(&cli->cl_flight_waiters) &&
2101 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2102 orsw = list_entry(cli->cl_flight_waiters.next,
2103 struct obd_request_slot_waiter, orsw_entry);
2104 list_del_init(&orsw->orsw_entry);
2105 cli->cl_rpcs_in_flight++;
2106 wake_up(&orsw->orsw_waitq);
2108 spin_unlock(&cli->cl_loi_list_lock);
2110 EXPORT_SYMBOL(obd_put_request_slot);
2112 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2114 return cli->cl_max_rpcs_in_flight;
2116 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2118 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2120 struct obd_request_slot_waiter *orsw;
2127 if (max > OBD_MAX_RIF_MAX || max < 1)
2130 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2131 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2132 /* adjust max_mod_rpcs_in_flight to ensure it is always
2133 * strictly lower that max_rpcs_in_flight */
2135 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2136 "because it must be higher than "
2137 "max_mod_rpcs_in_flight value",
2138 cli->cl_import->imp_obd->obd_name);
2141 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2142 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2148 spin_lock(&cli->cl_loi_list_lock);
2149 old = cli->cl_max_rpcs_in_flight;
2150 cli->cl_max_rpcs_in_flight = max;
2151 client_adjust_max_dirty(cli);
2155 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2156 for (i = 0; i < diff; i++) {
2157 if (list_empty(&cli->cl_flight_waiters))
2160 orsw = list_entry(cli->cl_flight_waiters.next,
2161 struct obd_request_slot_waiter, orsw_entry);
2162 list_del_init(&orsw->orsw_entry);
2163 cli->cl_rpcs_in_flight++;
2164 wake_up(&orsw->orsw_waitq);
2166 spin_unlock(&cli->cl_loi_list_lock);
2170 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2172 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2174 return cli->cl_max_mod_rpcs_in_flight;
2176 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2178 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2180 struct obd_connect_data *ocd;
2184 if (max > OBD_MAX_RIF_MAX || max < 1)
2187 /* cannot exceed or equal max_rpcs_in_flight */
2188 if (max >= cli->cl_max_rpcs_in_flight) {
2189 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2190 "higher or equal to max_rpcs_in_flight value (%u)\n",
2191 cli->cl_import->imp_obd->obd_name,
2192 max, cli->cl_max_rpcs_in_flight);
2196 /* cannot exceed max modify RPCs in flight supported by the server */
2197 ocd = &cli->cl_import->imp_connect_data;
2198 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2199 maxmodrpcs = ocd->ocd_maxmodrpcs;
2202 if (max > maxmodrpcs) {
2203 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2204 "higher than max_mod_rpcs_per_client value (%hu) "
2205 "returned by the server at connection\n",
2206 cli->cl_import->imp_obd->obd_name,
2211 spin_lock(&cli->cl_mod_rpcs_lock);
2213 prev = cli->cl_max_mod_rpcs_in_flight;
2214 cli->cl_max_mod_rpcs_in_flight = max;
2216 /* wakeup waiters if limit has been increased */
2217 if (cli->cl_max_mod_rpcs_in_flight > prev)
2218 wake_up(&cli->cl_mod_rpcs_waitq);
2220 spin_unlock(&cli->cl_mod_rpcs_lock);
2224 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2226 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2227 struct seq_file *seq)
2229 unsigned long mod_tot = 0, mod_cum;
2230 struct timespec64 now;
2233 ktime_get_real_ts64(&now);
2235 spin_lock(&cli->cl_mod_rpcs_lock);
2237 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2238 (s64)now.tv_sec, now.tv_nsec);
2239 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2240 cli->cl_mod_rpcs_in_flight);
2242 seq_printf(seq, "\n\t\t\tmodify\n");
2243 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2245 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2248 for (i = 0; i < OBD_HIST_MAX; i++) {
2249 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2251 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2252 i, mod, pct(mod, mod_tot),
2253 pct(mod_cum, mod_tot));
2254 if (mod_cum == mod_tot)
2258 spin_unlock(&cli->cl_mod_rpcs_lock);
2262 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2264 /* The number of modify RPCs sent in parallel is limited
2265 * because the server has a finite number of slots per client to
2266 * store request result and ensure reply reconstruction when needed.
2267 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2268 * that takes into account server limit and cl_max_rpcs_in_flight
2270 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2271 * one close request is allowed above the maximum.
2273 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2278 /* A slot is available if
2279 * - number of modify RPCs in flight is less than the max
2280 * - it's a close RPC and no other close request is in flight
2282 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2283 (close_req && cli->cl_close_rpcs_in_flight == 0);
2288 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2293 spin_lock(&cli->cl_mod_rpcs_lock);
2294 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2295 spin_unlock(&cli->cl_mod_rpcs_lock);
2299 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2302 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2303 it->it_op == IT_READDIR ||
2304 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2309 /* Get a modify RPC slot from the obd client @cli according
2310 * to the kind of operation @opc that is going to be sent
2311 * and the intent @it of the operation if it applies.
2312 * If the maximum number of modify RPCs in flight is reached
2313 * the thread is put to sleep.
2314 * Returns the tag to be set in the request message. Tag 0
2315 * is reserved for non-modifying requests.
2317 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2318 struct lookup_intent *it)
2320 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2321 bool close_req = false;
2324 /* read-only metadata RPCs don't consume a slot on MDT
2325 * for reply reconstruction
2327 if (obd_skip_mod_rpc_slot(it))
2330 if (opc == MDS_CLOSE)
2334 spin_lock(&cli->cl_mod_rpcs_lock);
2335 max = cli->cl_max_mod_rpcs_in_flight;
2336 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2337 /* there is a slot available */
2338 cli->cl_mod_rpcs_in_flight++;
2340 cli->cl_close_rpcs_in_flight++;
2341 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2342 cli->cl_mod_rpcs_in_flight);
2343 /* find a free tag */
2344 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2346 LASSERT(i < OBD_MAX_RIF_MAX);
2347 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2348 spin_unlock(&cli->cl_mod_rpcs_lock);
2349 /* tag 0 is reserved for non-modify RPCs */
2352 spin_unlock(&cli->cl_mod_rpcs_lock);
2354 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2355 "opc %u, max %hu\n",
2356 cli->cl_import->imp_obd->obd_name, opc, max);
2358 l_wait_event_exclusive(cli->cl_mod_rpcs_waitq,
2359 obd_mod_rpc_slot_avail(cli, close_req),
2363 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2365 /* Put a modify RPC slot from the obd client @cli according
2366 * to the kind of operation @opc that has been sent and the
2367 * intent @it of the operation if it applies.
2369 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2370 struct lookup_intent *it, __u16 tag)
2372 bool close_req = false;
2374 if (obd_skip_mod_rpc_slot(it))
2377 if (opc == MDS_CLOSE)
2380 spin_lock(&cli->cl_mod_rpcs_lock);
2381 cli->cl_mod_rpcs_in_flight--;
2383 cli->cl_close_rpcs_in_flight--;
2384 /* release the tag in the bitmap */
2385 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2386 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2387 spin_unlock(&cli->cl_mod_rpcs_lock);
2388 wake_up(&cli->cl_mod_rpcs_waitq);
2390 EXPORT_SYMBOL(obd_put_mod_rpc_slot);