4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/workqueue.h>
42 #include <lustre_compat.h>
43 #include <obd_class.h>
44 #include <lustre_log.h>
45 #include <lprocfs_status.h>
46 #include <lustre_disk.h>
47 #include <lustre_kernelcomm.h>
49 static DEFINE_SPINLOCK(obd_types_lock);
50 static LIST_HEAD(obd_types);
51 DEFINE_RWLOCK(obd_dev_lock);
52 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
54 static struct kmem_cache *obd_device_cachep;
55 struct kmem_cache *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
58 static struct workqueue_struct *zombie_wq;
60 static void obd_zombie_export_add(struct obd_export *exp);
61 static void obd_zombie_import_add(struct obd_import *imp);
62 static void print_export_data(struct obd_export *exp,
63 const char *status, int locks, int debug_level);
65 static LIST_HEAD(obd_stale_exports);
66 static DEFINE_SPINLOCK(obd_stale_export_lock);
67 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
69 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
70 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
73 * support functions: we could use inter-module communication, but this
74 * is more portable to other OS's
76 static struct obd_device *obd_device_alloc(void)
78 struct obd_device *obd;
80 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
82 obd->obd_magic = OBD_DEVICE_MAGIC;
87 static void obd_device_free(struct obd_device *obd)
90 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
91 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
92 if (obd->obd_namespace != NULL) {
93 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
94 obd, obd->obd_namespace, obd->obd_force);
97 lu_ref_fini(&obd->obd_reference);
98 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
101 struct obd_type *class_search_type(const char *name)
103 struct list_head *tmp;
104 struct obd_type *type;
106 spin_lock(&obd_types_lock);
107 list_for_each(tmp, &obd_types) {
108 type = list_entry(tmp, struct obd_type, typ_chain);
109 if (strcmp(type->typ_name, name) == 0) {
110 spin_unlock(&obd_types_lock);
114 spin_unlock(&obd_types_lock);
117 EXPORT_SYMBOL(class_search_type);
119 struct obd_type *class_get_type(const char *name)
121 struct obd_type *type = class_search_type(name);
123 #ifdef HAVE_MODULE_LOADING_SUPPORT
125 const char *modname = name;
127 if (strcmp(modname, "obdfilter") == 0)
130 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
131 modname = LUSTRE_OSP_NAME;
133 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
134 modname = LUSTRE_MDT_NAME;
136 if (!request_module("%s", modname)) {
137 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
138 type = class_search_type(name);
140 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
146 spin_lock(&type->obd_type_lock);
148 try_module_get(type->typ_dt_ops->o_owner);
149 spin_unlock(&type->obd_type_lock);
154 void class_put_type(struct obd_type *type)
157 spin_lock(&type->obd_type_lock);
159 module_put(type->typ_dt_ops->o_owner);
160 spin_unlock(&type->obd_type_lock);
163 static void class_sysfs_release(struct kobject *kobj)
165 OBD_FREE(kobj, sizeof(*kobj));
168 static struct kobj_type class_ktype = {
169 .sysfs_ops = &lustre_sysfs_ops,
170 .release = class_sysfs_release,
173 struct kobject *class_setup_tunables(const char *name)
175 struct kobject *kobj;
178 #ifdef HAVE_SERVER_SUPPORT
179 kobj = kset_find_obj(lustre_kset, name);
183 OBD_ALLOC(kobj, sizeof(*kobj));
185 return ERR_PTR(-ENOMEM);
187 kobj->kset = lustre_kset;
188 kobject_init(kobj, &class_ktype);
189 rc = kobject_add(kobj, &lustre_kset->kobj, "%s", name);
196 EXPORT_SYMBOL(class_setup_tunables);
198 #define CLASS_MAX_NAME 1024
200 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
201 bool enable_proc, struct lprocfs_vars *vars,
202 const char *name, struct lu_device_type *ldt)
204 struct obd_type *type;
205 #ifdef HAVE_SERVER_SUPPORT
207 #endif /* HAVE_SERVER_SUPPORT */
212 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
214 if (class_search_type(name)) {
215 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
220 OBD_ALLOC(type, sizeof(*type));
224 OBD_ALLOC_PTR(type->typ_dt_ops);
225 OBD_ALLOC_PTR(type->typ_md_ops);
226 OBD_ALLOC(type->typ_name, strlen(name) + 1);
228 if (type->typ_dt_ops == NULL ||
229 type->typ_md_ops == NULL ||
230 type->typ_name == NULL)
233 *(type->typ_dt_ops) = *dt_ops;
234 /* md_ops is optional */
236 *(type->typ_md_ops) = *md_ops;
237 strcpy(type->typ_name, name);
238 spin_lock_init(&type->obd_type_lock);
240 #ifdef CONFIG_PROC_FS
242 type->typ_procroot = lprocfs_register(type->typ_name,
245 if (IS_ERR(type->typ_procroot)) {
246 rc = PTR_ERR(type->typ_procroot);
247 type->typ_procroot = NULL;
252 #ifdef HAVE_SERVER_SUPPORT
254 dname.len = strlen(dname.name);
255 dname.hash = ll_full_name_hash(debugfs_lustre_root, dname.name,
257 type->typ_debugfs_entry = d_lookup(debugfs_lustre_root, &dname);
258 if (type->typ_debugfs_entry) {
259 dput(type->typ_debugfs_entry);
260 type->typ_sym_filter = true;
263 #endif /* HAVE_SERVER_SUPPORT */
265 type->typ_debugfs_entry = ldebugfs_register(type->typ_name,
268 if (IS_ERR_OR_NULL(type->typ_debugfs_entry)) {
269 rc = type->typ_debugfs_entry ? PTR_ERR(type->typ_debugfs_entry)
271 type->typ_debugfs_entry = NULL;
274 #ifdef HAVE_SERVER_SUPPORT
277 type->typ_kobj = class_setup_tunables(type->typ_name);
278 if (IS_ERR(type->typ_kobj))
279 GOTO(failed, rc = PTR_ERR(type->typ_kobj));
283 rc = lu_device_type_init(ldt);
285 kobject_put(type->typ_kobj);
290 spin_lock(&obd_types_lock);
291 list_add(&type->typ_chain, &obd_types);
292 spin_unlock(&obd_types_lock);
297 #ifdef HAVE_SERVER_SUPPORT
298 if (type->typ_sym_filter)
299 type->typ_debugfs_entry = NULL;
301 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
302 ldebugfs_remove(&type->typ_debugfs_entry);
303 if (type->typ_name != NULL) {
304 #ifdef CONFIG_PROC_FS
305 if (type->typ_procroot != NULL)
306 remove_proc_subtree(type->typ_name, proc_lustre_root);
308 OBD_FREE(type->typ_name, strlen(name) + 1);
310 if (type->typ_md_ops != NULL)
311 OBD_FREE_PTR(type->typ_md_ops);
312 if (type->typ_dt_ops != NULL)
313 OBD_FREE_PTR(type->typ_dt_ops);
314 OBD_FREE(type, sizeof(*type));
317 EXPORT_SYMBOL(class_register_type);
319 int class_unregister_type(const char *name)
321 struct obd_type *type = class_search_type(name);
325 CERROR("unknown obd type\n");
329 if (type->typ_refcnt) {
330 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
331 /* This is a bad situation, let's make the best of it */
332 /* Remove ops, but leave the name for debugging */
333 OBD_FREE_PTR(type->typ_dt_ops);
334 OBD_FREE_PTR(type->typ_md_ops);
338 kobject_put(type->typ_kobj);
340 /* we do not use type->typ_procroot as for compatibility purposes
341 * other modules can share names (i.e. lod can use lov entry). so
342 * we can't reference pointer as it can get invalided when another
343 * module removes the entry */
344 #ifdef CONFIG_PROC_FS
345 if (type->typ_procroot != NULL)
346 remove_proc_subtree(type->typ_name, proc_lustre_root);
347 if (type->typ_procsym != NULL)
348 lprocfs_remove(&type->typ_procsym);
350 #ifdef HAVE_SERVER_SUPPORT
351 if (type->typ_sym_filter)
352 type->typ_debugfs_entry = NULL;
354 if (!IS_ERR_OR_NULL(type->typ_debugfs_entry))
355 ldebugfs_remove(&type->typ_debugfs_entry);
358 lu_device_type_fini(type->typ_lu);
360 spin_lock(&obd_types_lock);
361 list_del(&type->typ_chain);
362 spin_unlock(&obd_types_lock);
363 OBD_FREE(type->typ_name, strlen(name) + 1);
364 if (type->typ_dt_ops != NULL)
365 OBD_FREE_PTR(type->typ_dt_ops);
366 if (type->typ_md_ops != NULL)
367 OBD_FREE_PTR(type->typ_md_ops);
368 OBD_FREE(type, sizeof(*type));
370 } /* class_unregister_type */
371 EXPORT_SYMBOL(class_unregister_type);
374 * Create a new obd device.
376 * Allocate the new obd_device and initialize it.
378 * \param[in] type_name obd device type string.
379 * \param[in] name obd device name.
380 * \param[in] uuid obd device UUID
382 * \retval newdev pointer to created obd_device
383 * \retval ERR_PTR(errno) on error
385 struct obd_device *class_newdev(const char *type_name, const char *name,
388 struct obd_device *newdev;
389 struct obd_type *type = NULL;
392 if (strlen(name) >= MAX_OBD_NAME) {
393 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
394 RETURN(ERR_PTR(-EINVAL));
397 type = class_get_type(type_name);
399 CERROR("OBD: unknown type: %s\n", type_name);
400 RETURN(ERR_PTR(-ENODEV));
403 newdev = obd_device_alloc();
404 if (newdev == NULL) {
405 class_put_type(type);
406 RETURN(ERR_PTR(-ENOMEM));
408 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
409 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
410 newdev->obd_type = type;
411 newdev->obd_minor = -1;
413 rwlock_init(&newdev->obd_pool_lock);
414 newdev->obd_pool_limit = 0;
415 newdev->obd_pool_slv = 0;
417 INIT_LIST_HEAD(&newdev->obd_exports);
418 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
419 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
420 INIT_LIST_HEAD(&newdev->obd_exports_timed);
421 INIT_LIST_HEAD(&newdev->obd_nid_stats);
422 spin_lock_init(&newdev->obd_nid_lock);
423 spin_lock_init(&newdev->obd_dev_lock);
424 mutex_init(&newdev->obd_dev_mutex);
425 spin_lock_init(&newdev->obd_osfs_lock);
426 /* newdev->obd_osfs_age must be set to a value in the distant
427 * past to guarantee a fresh statfs is fetched on mount. */
428 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
430 /* XXX belongs in setup not attach */
431 init_rwsem(&newdev->obd_observer_link_sem);
433 spin_lock_init(&newdev->obd_recovery_task_lock);
434 init_waitqueue_head(&newdev->obd_next_transno_waitq);
435 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
436 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
437 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
438 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
439 INIT_LIST_HEAD(&newdev->obd_evict_list);
440 INIT_LIST_HEAD(&newdev->obd_lwp_list);
442 llog_group_init(&newdev->obd_olg);
443 /* Detach drops this */
444 atomic_set(&newdev->obd_refcount, 1);
445 lu_ref_init(&newdev->obd_reference);
446 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
448 newdev->obd_conn_inprogress = 0;
450 strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
452 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
453 newdev->obd_name, newdev);
461 * \param[in] obd obd_device to be freed
465 void class_free_dev(struct obd_device *obd)
467 struct obd_type *obd_type = obd->obd_type;
469 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
470 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
471 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
472 "obd %p != obd_devs[%d] %p\n",
473 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
474 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
475 "obd_refcount should be 0, not %d\n",
476 atomic_read(&obd->obd_refcount));
477 LASSERT(obd_type != NULL);
479 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
480 obd->obd_name, obd->obd_type->typ_name);
482 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
483 obd->obd_name, obd->obd_uuid.uuid);
484 if (obd->obd_stopping) {
487 /* If we're not stopping, we were never set up */
488 err = obd_cleanup(obd);
490 CERROR("Cleanup %s returned %d\n",
494 obd_device_free(obd);
496 class_put_type(obd_type);
500 * Unregister obd device.
502 * Free slot in obd_dev[] used by \a obd.
504 * \param[in] new_obd obd_device to be unregistered
508 void class_unregister_device(struct obd_device *obd)
510 write_lock(&obd_dev_lock);
511 if (obd->obd_minor >= 0) {
512 LASSERT(obd_devs[obd->obd_minor] == obd);
513 obd_devs[obd->obd_minor] = NULL;
516 write_unlock(&obd_dev_lock);
520 * Register obd device.
522 * Find free slot in obd_devs[], fills it with \a new_obd.
524 * \param[in] new_obd obd_device to be registered
527 * \retval -EEXIST device with this name is registered
528 * \retval -EOVERFLOW obd_devs[] is full
530 int class_register_device(struct obd_device *new_obd)
534 int new_obd_minor = 0;
535 bool minor_assign = false;
536 bool retried = false;
539 write_lock(&obd_dev_lock);
540 for (i = 0; i < class_devno_max(); i++) {
541 struct obd_device *obd = class_num2obd(i);
544 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
547 write_unlock(&obd_dev_lock);
549 /* the obd_device could be waited to be
550 * destroyed by the "obd_zombie_impexp_thread".
552 obd_zombie_barrier();
557 CERROR("%s: already exists, won't add\n",
559 /* in case we found a free slot before duplicate */
560 minor_assign = false;
564 if (!minor_assign && obd == NULL) {
571 new_obd->obd_minor = new_obd_minor;
572 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
573 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
574 obd_devs[new_obd_minor] = new_obd;
578 CERROR("%s: all %u/%u devices used, increase "
579 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
580 i, class_devno_max(), ret);
583 write_unlock(&obd_dev_lock);
588 static int class_name2dev_nolock(const char *name)
595 for (i = 0; i < class_devno_max(); i++) {
596 struct obd_device *obd = class_num2obd(i);
598 if (obd && strcmp(name, obd->obd_name) == 0) {
599 /* Make sure we finished attaching before we give
600 out any references */
601 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
602 if (obd->obd_attached) {
612 int class_name2dev(const char *name)
619 read_lock(&obd_dev_lock);
620 i = class_name2dev_nolock(name);
621 read_unlock(&obd_dev_lock);
625 EXPORT_SYMBOL(class_name2dev);
627 struct obd_device *class_name2obd(const char *name)
629 int dev = class_name2dev(name);
631 if (dev < 0 || dev > class_devno_max())
633 return class_num2obd(dev);
635 EXPORT_SYMBOL(class_name2obd);
637 int class_uuid2dev_nolock(struct obd_uuid *uuid)
641 for (i = 0; i < class_devno_max(); i++) {
642 struct obd_device *obd = class_num2obd(i);
644 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
645 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
653 int class_uuid2dev(struct obd_uuid *uuid)
657 read_lock(&obd_dev_lock);
658 i = class_uuid2dev_nolock(uuid);
659 read_unlock(&obd_dev_lock);
663 EXPORT_SYMBOL(class_uuid2dev);
665 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
667 int dev = class_uuid2dev(uuid);
670 return class_num2obd(dev);
672 EXPORT_SYMBOL(class_uuid2obd);
675 * Get obd device from ::obd_devs[]
677 * \param num [in] array index
679 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
680 * otherwise return the obd device there.
682 struct obd_device *class_num2obd(int num)
684 struct obd_device *obd = NULL;
686 if (num < class_devno_max()) {
691 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
692 "%p obd_magic %08x != %08x\n",
693 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
694 LASSERTF(obd->obd_minor == num,
695 "%p obd_minor %0d != %0d\n",
696 obd, obd->obd_minor, num);
703 * Find obd in obd_dev[] by name or uuid.
705 * Increment obd's refcount if found.
707 * \param[in] str obd name or uuid
709 * \retval NULL if not found
710 * \retval target pointer to found obd_device
712 struct obd_device *class_dev_by_str(const char *str)
714 struct obd_device *target = NULL;
715 struct obd_uuid tgtuuid;
718 obd_str2uuid(&tgtuuid, str);
720 read_lock(&obd_dev_lock);
721 rc = class_uuid2dev_nolock(&tgtuuid);
723 rc = class_name2dev_nolock(str);
726 target = class_num2obd(rc);
729 class_incref(target, "find", current);
730 read_unlock(&obd_dev_lock);
734 EXPORT_SYMBOL(class_dev_by_str);
737 * Get obd devices count. Device in any
739 * \retval obd device count
741 int get_devices_count(void)
743 int index, max_index = class_devno_max(), dev_count = 0;
745 read_lock(&obd_dev_lock);
746 for (index = 0; index <= max_index; index++) {
747 struct obd_device *obd = class_num2obd(index);
751 read_unlock(&obd_dev_lock);
755 EXPORT_SYMBOL(get_devices_count);
757 void class_obd_list(void)
762 read_lock(&obd_dev_lock);
763 for (i = 0; i < class_devno_max(); i++) {
764 struct obd_device *obd = class_num2obd(i);
768 if (obd->obd_stopping)
770 else if (obd->obd_set_up)
772 else if (obd->obd_attached)
776 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
777 i, status, obd->obd_type->typ_name,
778 obd->obd_name, obd->obd_uuid.uuid,
779 atomic_read(&obd->obd_refcount));
781 read_unlock(&obd_dev_lock);
785 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
786 specified, then only the client with that uuid is returned,
787 otherwise any client connected to the tgt is returned. */
788 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
789 const char * typ_name,
790 struct obd_uuid *grp_uuid)
794 read_lock(&obd_dev_lock);
795 for (i = 0; i < class_devno_max(); i++) {
796 struct obd_device *obd = class_num2obd(i);
800 if ((strncmp(obd->obd_type->typ_name, typ_name,
801 strlen(typ_name)) == 0)) {
802 if (obd_uuid_equals(tgt_uuid,
803 &obd->u.cli.cl_target_uuid) &&
804 ((grp_uuid)? obd_uuid_equals(grp_uuid,
805 &obd->obd_uuid) : 1)) {
806 read_unlock(&obd_dev_lock);
811 read_unlock(&obd_dev_lock);
815 EXPORT_SYMBOL(class_find_client_obd);
817 /* Iterate the obd_device list looking devices have grp_uuid. Start
818 searching at *next, and if a device is found, the next index to look
819 at is saved in *next. If next is NULL, then the first matching device
820 will always be returned. */
821 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
827 else if (*next >= 0 && *next < class_devno_max())
832 read_lock(&obd_dev_lock);
833 for (; i < class_devno_max(); i++) {
834 struct obd_device *obd = class_num2obd(i);
838 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
841 read_unlock(&obd_dev_lock);
845 read_unlock(&obd_dev_lock);
849 EXPORT_SYMBOL(class_devices_in_group);
852 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
853 * adjust sptlrpc settings accordingly.
855 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
857 struct obd_device *obd;
861 LASSERT(namelen > 0);
863 read_lock(&obd_dev_lock);
864 for (i = 0; i < class_devno_max(); i++) {
865 obd = class_num2obd(i);
867 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
870 /* only notify mdc, osc, osp, lwp, mdt, ost
871 * because only these have a -sptlrpc llog */
872 type = obd->obd_type->typ_name;
873 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
874 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
875 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
876 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
877 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
878 strcmp(type, LUSTRE_OST_NAME) != 0)
881 if (strncmp(obd->obd_name, fsname, namelen))
884 class_incref(obd, __FUNCTION__, obd);
885 read_unlock(&obd_dev_lock);
886 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
887 sizeof(KEY_SPTLRPC_CONF),
888 KEY_SPTLRPC_CONF, 0, NULL, NULL);
890 class_decref(obd, __FUNCTION__, obd);
891 read_lock(&obd_dev_lock);
893 read_unlock(&obd_dev_lock);
896 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
898 void obd_cleanup_caches(void)
901 if (obd_device_cachep) {
902 kmem_cache_destroy(obd_device_cachep);
903 obd_device_cachep = NULL;
906 kmem_cache_destroy(obdo_cachep);
913 int obd_init_caches(void)
918 LASSERT(obd_device_cachep == NULL);
919 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
920 sizeof(struct obd_device),
922 if (!obd_device_cachep)
923 GOTO(out, rc = -ENOMEM);
925 LASSERT(obdo_cachep == NULL);
926 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
929 GOTO(out, rc = -ENOMEM);
933 obd_cleanup_caches();
937 /* map connection to client */
938 struct obd_export *class_conn2export(struct lustre_handle *conn)
940 struct obd_export *export;
944 CDEBUG(D_CACHE, "looking for null handle\n");
948 if (conn->cookie == -1) { /* this means assign a new connection */
949 CDEBUG(D_CACHE, "want a new connection\n");
953 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
954 export = class_handle2object(conn->cookie, NULL);
957 EXPORT_SYMBOL(class_conn2export);
959 struct obd_device *class_exp2obd(struct obd_export *exp)
965 EXPORT_SYMBOL(class_exp2obd);
967 struct obd_device *class_conn2obd(struct lustre_handle *conn)
969 struct obd_export *export;
970 export = class_conn2export(conn);
972 struct obd_device *obd = export->exp_obd;
973 class_export_put(export);
979 struct obd_import *class_exp2cliimp(struct obd_export *exp)
981 struct obd_device *obd = exp->exp_obd;
984 return obd->u.cli.cl_import;
986 EXPORT_SYMBOL(class_exp2cliimp);
988 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
990 struct obd_device *obd = class_conn2obd(conn);
993 return obd->u.cli.cl_import;
996 /* Export management functions */
997 static void class_export_destroy(struct obd_export *exp)
999 struct obd_device *obd = exp->exp_obd;
1002 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
1003 LASSERT(obd != NULL);
1005 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
1006 exp->exp_client_uuid.uuid, obd->obd_name);
1008 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
1009 if (exp->exp_connection)
1010 ptlrpc_put_connection_superhack(exp->exp_connection);
1012 LASSERT(list_empty(&exp->exp_outstanding_replies));
1013 LASSERT(list_empty(&exp->exp_uncommitted_replies));
1014 LASSERT(list_empty(&exp->exp_req_replay_queue));
1015 LASSERT(list_empty(&exp->exp_hp_rpcs));
1016 obd_destroy_export(exp);
1017 /* self export doesn't hold a reference to an obd, although it
1018 * exists until freeing of the obd */
1019 if (exp != obd->obd_self_export)
1020 class_decref(obd, "export", exp);
1022 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
1026 static void export_handle_addref(void *export)
1028 class_export_get(export);
1031 static struct portals_handle_ops export_handle_ops = {
1032 .hop_addref = export_handle_addref,
1036 struct obd_export *class_export_get(struct obd_export *exp)
1038 atomic_inc(&exp->exp_refcount);
1039 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
1040 atomic_read(&exp->exp_refcount));
1043 EXPORT_SYMBOL(class_export_get);
1045 void class_export_put(struct obd_export *exp)
1047 LASSERT(exp != NULL);
1048 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1049 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1050 atomic_read(&exp->exp_refcount) - 1);
1052 if (atomic_dec_and_test(&exp->exp_refcount)) {
1053 struct obd_device *obd = exp->exp_obd;
1055 CDEBUG(D_IOCTL, "final put %p/%s\n",
1056 exp, exp->exp_client_uuid.uuid);
1058 /* release nid stat refererence */
1059 lprocfs_exp_cleanup(exp);
1061 if (exp == obd->obd_self_export) {
1062 /* self export should be destroyed without
1063 * zombie thread as it doesn't hold a
1064 * reference to obd and doesn't hold any
1066 class_export_destroy(exp);
1067 /* self export is destroyed, no class
1068 * references exist and it is safe to free
1070 class_free_dev(obd);
1072 LASSERT(!list_empty(&exp->exp_obd_chain));
1073 obd_zombie_export_add(exp);
1078 EXPORT_SYMBOL(class_export_put);
1080 static void obd_zombie_exp_cull(struct work_struct *ws)
1082 struct obd_export *export;
1084 export = container_of(ws, struct obd_export, exp_zombie_work);
1085 class_export_destroy(export);
1088 /* Creates a new export, adds it to the hash table, and returns a
1089 * pointer to it. The refcount is 2: one for the hash reference, and
1090 * one for the pointer returned by this function. */
1091 struct obd_export *__class_new_export(struct obd_device *obd,
1092 struct obd_uuid *cluuid, bool is_self)
1094 struct obd_export *export;
1095 struct cfs_hash *hash = NULL;
1099 OBD_ALLOC_PTR(export);
1101 return ERR_PTR(-ENOMEM);
1103 export->exp_conn_cnt = 0;
1104 export->exp_lock_hash = NULL;
1105 export->exp_flock_hash = NULL;
1106 /* 2 = class_handle_hash + last */
1107 atomic_set(&export->exp_refcount, 2);
1108 atomic_set(&export->exp_rpc_count, 0);
1109 atomic_set(&export->exp_cb_count, 0);
1110 atomic_set(&export->exp_locks_count, 0);
1111 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1112 INIT_LIST_HEAD(&export->exp_locks_list);
1113 spin_lock_init(&export->exp_locks_list_guard);
1115 atomic_set(&export->exp_replay_count, 0);
1116 export->exp_obd = obd;
1117 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1118 spin_lock_init(&export->exp_uncommitted_replies_lock);
1119 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1120 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1121 INIT_LIST_HEAD(&export->exp_handle.h_link);
1122 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1123 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1124 class_handle_hash(&export->exp_handle, &export_handle_ops);
1125 export->exp_last_request_time = ktime_get_real_seconds();
1126 spin_lock_init(&export->exp_lock);
1127 spin_lock_init(&export->exp_rpc_lock);
1128 INIT_HLIST_NODE(&export->exp_uuid_hash);
1129 INIT_HLIST_NODE(&export->exp_nid_hash);
1130 INIT_HLIST_NODE(&export->exp_gen_hash);
1131 spin_lock_init(&export->exp_bl_list_lock);
1132 INIT_LIST_HEAD(&export->exp_bl_list);
1133 INIT_LIST_HEAD(&export->exp_stale_list);
1134 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1136 export->exp_sp_peer = LUSTRE_SP_ANY;
1137 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1138 export->exp_client_uuid = *cluuid;
1139 obd_init_export(export);
1141 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1142 spin_lock(&obd->obd_dev_lock);
1143 /* shouldn't happen, but might race */
1144 if (obd->obd_stopping)
1145 GOTO(exit_unlock, rc = -ENODEV);
1147 hash = cfs_hash_getref(obd->obd_uuid_hash);
1149 GOTO(exit_unlock, rc = -ENODEV);
1150 spin_unlock(&obd->obd_dev_lock);
1152 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1154 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1155 obd->obd_name, cluuid->uuid, rc);
1156 GOTO(exit_err, rc = -EALREADY);
1160 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1161 spin_lock(&obd->obd_dev_lock);
1162 if (obd->obd_stopping) {
1164 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1165 GOTO(exit_unlock, rc = -ESHUTDOWN);
1169 class_incref(obd, "export", export);
1170 list_add_tail(&export->exp_obd_chain_timed,
1171 &obd->obd_exports_timed);
1172 list_add(&export->exp_obd_chain, &obd->obd_exports);
1173 obd->obd_num_exports++;
1175 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1176 INIT_LIST_HEAD(&export->exp_obd_chain);
1178 spin_unlock(&obd->obd_dev_lock);
1180 cfs_hash_putref(hash);
1184 spin_unlock(&obd->obd_dev_lock);
1187 cfs_hash_putref(hash);
1188 class_handle_unhash(&export->exp_handle);
1189 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1190 obd_destroy_export(export);
1191 OBD_FREE_PTR(export);
1195 struct obd_export *class_new_export(struct obd_device *obd,
1196 struct obd_uuid *uuid)
1198 return __class_new_export(obd, uuid, false);
1200 EXPORT_SYMBOL(class_new_export);
1202 struct obd_export *class_new_export_self(struct obd_device *obd,
1203 struct obd_uuid *uuid)
1205 return __class_new_export(obd, uuid, true);
1208 void class_unlink_export(struct obd_export *exp)
1210 class_handle_unhash(&exp->exp_handle);
1212 if (exp->exp_obd->obd_self_export == exp) {
1213 class_export_put(exp);
1217 spin_lock(&exp->exp_obd->obd_dev_lock);
1218 /* delete an uuid-export hashitem from hashtables */
1219 if (!hlist_unhashed(&exp->exp_uuid_hash))
1220 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1221 &exp->exp_client_uuid,
1222 &exp->exp_uuid_hash);
1224 #ifdef HAVE_SERVER_SUPPORT
1225 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1226 struct tg_export_data *ted = &exp->exp_target_data;
1227 struct cfs_hash *hash;
1229 /* Because obd_gen_hash will not be released until
1230 * class_cleanup(), so hash should never be NULL here */
1231 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1232 LASSERT(hash != NULL);
1233 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1234 &exp->exp_gen_hash);
1235 cfs_hash_putref(hash);
1237 #endif /* HAVE_SERVER_SUPPORT */
1239 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1240 list_del_init(&exp->exp_obd_chain_timed);
1241 exp->exp_obd->obd_num_exports--;
1242 spin_unlock(&exp->exp_obd->obd_dev_lock);
1243 atomic_inc(&obd_stale_export_num);
1245 /* A reference is kept by obd_stale_exports list */
1246 obd_stale_export_put(exp);
1248 EXPORT_SYMBOL(class_unlink_export);
1250 /* Import management functions */
1251 static void class_import_destroy(struct obd_import *imp)
1255 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1256 imp->imp_obd->obd_name);
1258 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1260 ptlrpc_put_connection_superhack(imp->imp_connection);
1262 while (!list_empty(&imp->imp_conn_list)) {
1263 struct obd_import_conn *imp_conn;
1265 imp_conn = list_entry(imp->imp_conn_list.next,
1266 struct obd_import_conn, oic_item);
1267 list_del_init(&imp_conn->oic_item);
1268 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1269 OBD_FREE(imp_conn, sizeof(*imp_conn));
1272 LASSERT(imp->imp_sec == NULL);
1273 class_decref(imp->imp_obd, "import", imp);
1274 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1278 static void import_handle_addref(void *import)
1280 class_import_get(import);
1283 static struct portals_handle_ops import_handle_ops = {
1284 .hop_addref = import_handle_addref,
1288 struct obd_import *class_import_get(struct obd_import *import)
1290 atomic_inc(&import->imp_refcount);
1291 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1292 atomic_read(&import->imp_refcount),
1293 import->imp_obd->obd_name);
1296 EXPORT_SYMBOL(class_import_get);
1298 void class_import_put(struct obd_import *imp)
1302 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1304 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1305 atomic_read(&imp->imp_refcount) - 1,
1306 imp->imp_obd->obd_name);
1308 if (atomic_dec_and_test(&imp->imp_refcount)) {
1309 CDEBUG(D_INFO, "final put import %p\n", imp);
1310 obd_zombie_import_add(imp);
1313 /* catch possible import put race */
1314 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1317 EXPORT_SYMBOL(class_import_put);
1319 static void init_imp_at(struct imp_at *at) {
1321 at_init(&at->iat_net_latency, 0, 0);
1322 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1323 /* max service estimates are tracked on the server side, so
1324 don't use the AT history here, just use the last reported
1325 val. (But keep hist for proc histogram, worst_ever) */
1326 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1331 static void obd_zombie_imp_cull(struct work_struct *ws)
1333 struct obd_import *import;
1335 import = container_of(ws, struct obd_import, imp_zombie_work);
1336 class_import_destroy(import);
1339 struct obd_import *class_new_import(struct obd_device *obd)
1341 struct obd_import *imp;
1342 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1344 OBD_ALLOC(imp, sizeof(*imp));
1348 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1349 INIT_LIST_HEAD(&imp->imp_replay_list);
1350 INIT_LIST_HEAD(&imp->imp_sending_list);
1351 INIT_LIST_HEAD(&imp->imp_delayed_list);
1352 INIT_LIST_HEAD(&imp->imp_committed_list);
1353 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1354 imp->imp_known_replied_xid = 0;
1355 imp->imp_replay_cursor = &imp->imp_committed_list;
1356 spin_lock_init(&imp->imp_lock);
1357 imp->imp_last_success_conn = 0;
1358 imp->imp_state = LUSTRE_IMP_NEW;
1359 imp->imp_obd = class_incref(obd, "import", imp);
1360 mutex_init(&imp->imp_sec_mutex);
1361 init_waitqueue_head(&imp->imp_recovery_waitq);
1362 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1364 if (curr_pid_ns->child_reaper)
1365 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1367 imp->imp_sec_refpid = 1;
1369 atomic_set(&imp->imp_refcount, 2);
1370 atomic_set(&imp->imp_unregistering, 0);
1371 atomic_set(&imp->imp_inflight, 0);
1372 atomic_set(&imp->imp_replay_inflight, 0);
1373 atomic_set(&imp->imp_inval_count, 0);
1374 INIT_LIST_HEAD(&imp->imp_conn_list);
1375 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1376 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1377 init_imp_at(&imp->imp_at);
1379 /* the default magic is V2, will be used in connect RPC, and
1380 * then adjusted according to the flags in request/reply. */
1381 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1385 EXPORT_SYMBOL(class_new_import);
1387 void class_destroy_import(struct obd_import *import)
1389 LASSERT(import != NULL);
1390 LASSERT(import != LP_POISON);
1392 class_handle_unhash(&import->imp_handle);
1394 spin_lock(&import->imp_lock);
1395 import->imp_generation++;
1396 spin_unlock(&import->imp_lock);
1397 class_import_put(import);
1399 EXPORT_SYMBOL(class_destroy_import);
1401 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1403 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1405 spin_lock(&exp->exp_locks_list_guard);
1407 LASSERT(lock->l_exp_refs_nr >= 0);
1409 if (lock->l_exp_refs_target != NULL &&
1410 lock->l_exp_refs_target != exp) {
1411 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1412 exp, lock, lock->l_exp_refs_target);
1414 if ((lock->l_exp_refs_nr ++) == 0) {
1415 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1416 lock->l_exp_refs_target = exp;
1418 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1419 lock, exp, lock->l_exp_refs_nr);
1420 spin_unlock(&exp->exp_locks_list_guard);
1422 EXPORT_SYMBOL(__class_export_add_lock_ref);
1424 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1426 spin_lock(&exp->exp_locks_list_guard);
1427 LASSERT(lock->l_exp_refs_nr > 0);
1428 if (lock->l_exp_refs_target != exp) {
1429 LCONSOLE_WARN("lock %p, "
1430 "mismatching export pointers: %p, %p\n",
1431 lock, lock->l_exp_refs_target, exp);
1433 if (-- lock->l_exp_refs_nr == 0) {
1434 list_del_init(&lock->l_exp_refs_link);
1435 lock->l_exp_refs_target = NULL;
1437 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1438 lock, exp, lock->l_exp_refs_nr);
1439 spin_unlock(&exp->exp_locks_list_guard);
1441 EXPORT_SYMBOL(__class_export_del_lock_ref);
1444 /* A connection defines an export context in which preallocation can
1445 be managed. This releases the export pointer reference, and returns
1446 the export handle, so the export refcount is 1 when this function
1448 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1449 struct obd_uuid *cluuid)
1451 struct obd_export *export;
1452 LASSERT(conn != NULL);
1453 LASSERT(obd != NULL);
1454 LASSERT(cluuid != NULL);
1457 export = class_new_export(obd, cluuid);
1459 RETURN(PTR_ERR(export));
1461 conn->cookie = export->exp_handle.h_cookie;
1462 class_export_put(export);
1464 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1465 cluuid->uuid, conn->cookie);
1468 EXPORT_SYMBOL(class_connect);
1470 /* if export is involved in recovery then clean up related things */
1471 static void class_export_recovery_cleanup(struct obd_export *exp)
1473 struct obd_device *obd = exp->exp_obd;
1475 spin_lock(&obd->obd_recovery_task_lock);
1476 if (obd->obd_recovering) {
1477 if (exp->exp_in_recovery) {
1478 spin_lock(&exp->exp_lock);
1479 exp->exp_in_recovery = 0;
1480 spin_unlock(&exp->exp_lock);
1481 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1482 atomic_dec(&obd->obd_connected_clients);
1485 /* if called during recovery then should update
1486 * obd_stale_clients counter,
1487 * lightweight exports are not counted */
1488 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1489 exp->exp_obd->obd_stale_clients++;
1491 spin_unlock(&obd->obd_recovery_task_lock);
1493 spin_lock(&exp->exp_lock);
1494 /** Cleanup req replay fields */
1495 if (exp->exp_req_replay_needed) {
1496 exp->exp_req_replay_needed = 0;
1498 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1499 atomic_dec(&obd->obd_req_replay_clients);
1502 /** Cleanup lock replay data */
1503 if (exp->exp_lock_replay_needed) {
1504 exp->exp_lock_replay_needed = 0;
1506 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1507 atomic_dec(&obd->obd_lock_replay_clients);
1509 spin_unlock(&exp->exp_lock);
1512 /* This function removes 1-3 references from the export:
1513 * 1 - for export pointer passed
1514 * and if disconnect really need
1515 * 2 - removing from hash
1516 * 3 - in client_unlink_export
1517 * The export pointer passed to this function can destroyed */
1518 int class_disconnect(struct obd_export *export)
1520 int already_disconnected;
1523 if (export == NULL) {
1524 CWARN("attempting to free NULL export %p\n", export);
1528 spin_lock(&export->exp_lock);
1529 already_disconnected = export->exp_disconnected;
1530 export->exp_disconnected = 1;
1531 /* We hold references of export for uuid hash
1532 * and nid_hash and export link at least. So
1533 * it is safe to call cfs_hash_del in there. */
1534 if (!hlist_unhashed(&export->exp_nid_hash))
1535 cfs_hash_del(export->exp_obd->obd_nid_hash,
1536 &export->exp_connection->c_peer.nid,
1537 &export->exp_nid_hash);
1538 spin_unlock(&export->exp_lock);
1540 /* class_cleanup(), abort_recovery(), and class_fail_export()
1541 * all end up in here, and if any of them race we shouldn't
1542 * call extra class_export_puts(). */
1543 if (already_disconnected) {
1544 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1545 GOTO(no_disconn, already_disconnected);
1548 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1549 export->exp_handle.h_cookie);
1551 class_export_recovery_cleanup(export);
1552 class_unlink_export(export);
1554 class_export_put(export);
1557 EXPORT_SYMBOL(class_disconnect);
1559 /* Return non-zero for a fully connected export */
1560 int class_connected_export(struct obd_export *exp)
1565 spin_lock(&exp->exp_lock);
1566 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1567 spin_unlock(&exp->exp_lock);
1571 EXPORT_SYMBOL(class_connected_export);
1573 static void class_disconnect_export_list(struct list_head *list,
1574 enum obd_option flags)
1577 struct obd_export *exp;
1580 /* It's possible that an export may disconnect itself, but
1581 * nothing else will be added to this list. */
1582 while (!list_empty(list)) {
1583 exp = list_entry(list->next, struct obd_export,
1585 /* need for safe call CDEBUG after obd_disconnect */
1586 class_export_get(exp);
1588 spin_lock(&exp->exp_lock);
1589 exp->exp_flags = flags;
1590 spin_unlock(&exp->exp_lock);
1592 if (obd_uuid_equals(&exp->exp_client_uuid,
1593 &exp->exp_obd->obd_uuid)) {
1595 "exp %p export uuid == obd uuid, don't discon\n",
1597 /* Need to delete this now so we don't end up pointing
1598 * to work_list later when this export is cleaned up. */
1599 list_del_init(&exp->exp_obd_chain);
1600 class_export_put(exp);
1604 class_export_get(exp);
1605 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1606 "last request at %lld\n",
1607 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1608 exp, exp->exp_last_request_time);
1609 /* release one export reference anyway */
1610 rc = obd_disconnect(exp);
1612 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1613 obd_export_nid2str(exp), exp, rc);
1614 class_export_put(exp);
1619 void class_disconnect_exports(struct obd_device *obd)
1621 struct list_head work_list;
1624 /* Move all of the exports from obd_exports to a work list, en masse. */
1625 INIT_LIST_HEAD(&work_list);
1626 spin_lock(&obd->obd_dev_lock);
1627 list_splice_init(&obd->obd_exports, &work_list);
1628 list_splice_init(&obd->obd_delayed_exports, &work_list);
1629 spin_unlock(&obd->obd_dev_lock);
1631 if (!list_empty(&work_list)) {
1632 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1633 "disconnecting them\n", obd->obd_minor, obd);
1634 class_disconnect_export_list(&work_list,
1635 exp_flags_from_obd(obd));
1637 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1638 obd->obd_minor, obd);
1641 EXPORT_SYMBOL(class_disconnect_exports);
1643 /* Remove exports that have not completed recovery.
1645 void class_disconnect_stale_exports(struct obd_device *obd,
1646 int (*test_export)(struct obd_export *))
1648 struct list_head work_list;
1649 struct obd_export *exp, *n;
1653 INIT_LIST_HEAD(&work_list);
1654 spin_lock(&obd->obd_dev_lock);
1655 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1657 /* don't count self-export as client */
1658 if (obd_uuid_equals(&exp->exp_client_uuid,
1659 &exp->exp_obd->obd_uuid))
1662 /* don't evict clients which have no slot in last_rcvd
1663 * (e.g. lightweight connection) */
1664 if (exp->exp_target_data.ted_lr_idx == -1)
1667 spin_lock(&exp->exp_lock);
1668 if (exp->exp_failed || test_export(exp)) {
1669 spin_unlock(&exp->exp_lock);
1672 exp->exp_failed = 1;
1673 spin_unlock(&exp->exp_lock);
1675 list_move(&exp->exp_obd_chain, &work_list);
1677 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1678 obd->obd_name, exp->exp_client_uuid.uuid,
1679 obd_export_nid2str(exp));
1680 print_export_data(exp, "EVICTING", 0, D_HA);
1682 spin_unlock(&obd->obd_dev_lock);
1685 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1686 obd->obd_name, evicted);
1688 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1689 OBD_OPT_ABORT_RECOV);
1692 EXPORT_SYMBOL(class_disconnect_stale_exports);
1694 void class_fail_export(struct obd_export *exp)
1696 int rc, already_failed;
1698 spin_lock(&exp->exp_lock);
1699 already_failed = exp->exp_failed;
1700 exp->exp_failed = 1;
1701 spin_unlock(&exp->exp_lock);
1703 if (already_failed) {
1704 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1705 exp, exp->exp_client_uuid.uuid);
1709 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1710 exp, exp->exp_client_uuid.uuid);
1712 if (obd_dump_on_timeout)
1713 libcfs_debug_dumplog();
1715 /* need for safe call CDEBUG after obd_disconnect */
1716 class_export_get(exp);
1718 /* Most callers into obd_disconnect are removing their own reference
1719 * (request, for example) in addition to the one from the hash table.
1720 * We don't have such a reference here, so make one. */
1721 class_export_get(exp);
1722 rc = obd_disconnect(exp);
1724 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1726 CDEBUG(D_HA, "disconnected export %p/%s\n",
1727 exp, exp->exp_client_uuid.uuid);
1728 class_export_put(exp);
1730 EXPORT_SYMBOL(class_fail_export);
1732 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1734 struct cfs_hash *nid_hash;
1735 struct obd_export *doomed_exp = NULL;
1736 int exports_evicted = 0;
1738 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1740 spin_lock(&obd->obd_dev_lock);
1741 /* umount has run already, so evict thread should leave
1742 * its task to umount thread now */
1743 if (obd->obd_stopping) {
1744 spin_unlock(&obd->obd_dev_lock);
1745 return exports_evicted;
1747 nid_hash = obd->obd_nid_hash;
1748 cfs_hash_getref(nid_hash);
1749 spin_unlock(&obd->obd_dev_lock);
1752 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1753 if (doomed_exp == NULL)
1756 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1757 "nid %s found, wanted nid %s, requested nid %s\n",
1758 obd_export_nid2str(doomed_exp),
1759 libcfs_nid2str(nid_key), nid);
1760 LASSERTF(doomed_exp != obd->obd_self_export,
1761 "self-export is hashed by NID?\n");
1763 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1764 "request\n", obd->obd_name,
1765 obd_uuid2str(&doomed_exp->exp_client_uuid),
1766 obd_export_nid2str(doomed_exp));
1767 class_fail_export(doomed_exp);
1768 class_export_put(doomed_exp);
1771 cfs_hash_putref(nid_hash);
1773 if (!exports_evicted)
1774 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1775 obd->obd_name, nid);
1776 return exports_evicted;
1778 EXPORT_SYMBOL(obd_export_evict_by_nid);
1780 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1782 struct cfs_hash *uuid_hash;
1783 struct obd_export *doomed_exp = NULL;
1784 struct obd_uuid doomed_uuid;
1785 int exports_evicted = 0;
1787 spin_lock(&obd->obd_dev_lock);
1788 if (obd->obd_stopping) {
1789 spin_unlock(&obd->obd_dev_lock);
1790 return exports_evicted;
1792 uuid_hash = obd->obd_uuid_hash;
1793 cfs_hash_getref(uuid_hash);
1794 spin_unlock(&obd->obd_dev_lock);
1796 obd_str2uuid(&doomed_uuid, uuid);
1797 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1798 CERROR("%s: can't evict myself\n", obd->obd_name);
1799 cfs_hash_putref(uuid_hash);
1800 return exports_evicted;
1803 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1805 if (doomed_exp == NULL) {
1806 CERROR("%s: can't disconnect %s: no exports found\n",
1807 obd->obd_name, uuid);
1809 CWARN("%s: evicting %s at adminstrative request\n",
1810 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1811 class_fail_export(doomed_exp);
1812 class_export_put(doomed_exp);
1815 cfs_hash_putref(uuid_hash);
1817 return exports_evicted;
1820 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1821 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1822 EXPORT_SYMBOL(class_export_dump_hook);
1825 static void print_export_data(struct obd_export *exp, const char *status,
1826 int locks, int debug_level)
1828 struct ptlrpc_reply_state *rs;
1829 struct ptlrpc_reply_state *first_reply = NULL;
1832 spin_lock(&exp->exp_lock);
1833 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1839 spin_unlock(&exp->exp_lock);
1841 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1842 "%p %s %llu stale:%d\n",
1843 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1844 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1845 atomic_read(&exp->exp_rpc_count),
1846 atomic_read(&exp->exp_cb_count),
1847 atomic_read(&exp->exp_locks_count),
1848 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1849 nreplies, first_reply, nreplies > 3 ? "..." : "",
1850 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1851 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1852 if (locks && class_export_dump_hook != NULL)
1853 class_export_dump_hook(exp);
1857 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1859 struct obd_export *exp;
1861 spin_lock(&obd->obd_dev_lock);
1862 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1863 print_export_data(exp, "ACTIVE", locks, debug_level);
1864 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1865 print_export_data(exp, "UNLINKED", locks, debug_level);
1866 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1867 print_export_data(exp, "DELAYED", locks, debug_level);
1868 spin_unlock(&obd->obd_dev_lock);
1871 void obd_exports_barrier(struct obd_device *obd)
1874 LASSERT(list_empty(&obd->obd_exports));
1875 spin_lock(&obd->obd_dev_lock);
1876 while (!list_empty(&obd->obd_unlinked_exports)) {
1877 spin_unlock(&obd->obd_dev_lock);
1878 set_current_state(TASK_UNINTERRUPTIBLE);
1879 schedule_timeout(cfs_time_seconds(waited));
1880 if (waited > 5 && is_power_of_2(waited)) {
1881 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1882 "more than %d seconds. "
1883 "The obd refcount = %d. Is it stuck?\n",
1884 obd->obd_name, waited,
1885 atomic_read(&obd->obd_refcount));
1886 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1889 spin_lock(&obd->obd_dev_lock);
1891 spin_unlock(&obd->obd_dev_lock);
1893 EXPORT_SYMBOL(obd_exports_barrier);
1896 * Add export to the obd_zombe thread and notify it.
1898 static void obd_zombie_export_add(struct obd_export *exp) {
1899 atomic_dec(&obd_stale_export_num);
1900 spin_lock(&exp->exp_obd->obd_dev_lock);
1901 LASSERT(!list_empty(&exp->exp_obd_chain));
1902 list_del_init(&exp->exp_obd_chain);
1903 spin_unlock(&exp->exp_obd->obd_dev_lock);
1905 queue_work(zombie_wq, &exp->exp_zombie_work);
1909 * Add import to the obd_zombe thread and notify it.
1911 static void obd_zombie_import_add(struct obd_import *imp) {
1912 LASSERT(imp->imp_sec == NULL);
1914 queue_work(zombie_wq, &imp->imp_zombie_work);
1918 * wait when obd_zombie import/export queues become empty
1920 void obd_zombie_barrier(void)
1922 flush_workqueue(zombie_wq);
1924 EXPORT_SYMBOL(obd_zombie_barrier);
1927 struct obd_export *obd_stale_export_get(void)
1929 struct obd_export *exp = NULL;
1932 spin_lock(&obd_stale_export_lock);
1933 if (!list_empty(&obd_stale_exports)) {
1934 exp = list_entry(obd_stale_exports.next,
1935 struct obd_export, exp_stale_list);
1936 list_del_init(&exp->exp_stale_list);
1938 spin_unlock(&obd_stale_export_lock);
1941 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1942 atomic_read(&obd_stale_export_num));
1946 EXPORT_SYMBOL(obd_stale_export_get);
1948 void obd_stale_export_put(struct obd_export *exp)
1952 LASSERT(list_empty(&exp->exp_stale_list));
1953 if (exp->exp_lock_hash &&
1954 atomic_read(&exp->exp_lock_hash->hs_count)) {
1955 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1956 atomic_read(&obd_stale_export_num));
1958 spin_lock_bh(&exp->exp_bl_list_lock);
1959 spin_lock(&obd_stale_export_lock);
1960 /* Add to the tail if there is no blocked locks,
1961 * to the head otherwise. */
1962 if (list_empty(&exp->exp_bl_list))
1963 list_add_tail(&exp->exp_stale_list,
1964 &obd_stale_exports);
1966 list_add(&exp->exp_stale_list,
1967 &obd_stale_exports);
1969 spin_unlock(&obd_stale_export_lock);
1970 spin_unlock_bh(&exp->exp_bl_list_lock);
1972 class_export_put(exp);
1976 EXPORT_SYMBOL(obd_stale_export_put);
1979 * Adjust the position of the export in the stale list,
1980 * i.e. move to the head of the list if is needed.
1982 void obd_stale_export_adjust(struct obd_export *exp)
1984 LASSERT(exp != NULL);
1985 spin_lock_bh(&exp->exp_bl_list_lock);
1986 spin_lock(&obd_stale_export_lock);
1988 if (!list_empty(&exp->exp_stale_list) &&
1989 !list_empty(&exp->exp_bl_list))
1990 list_move(&exp->exp_stale_list, &obd_stale_exports);
1992 spin_unlock(&obd_stale_export_lock);
1993 spin_unlock_bh(&exp->exp_bl_list_lock);
1995 EXPORT_SYMBOL(obd_stale_export_adjust);
1998 * start destroy zombie import/export thread
2000 int obd_zombie_impexp_init(void)
2002 zombie_wq = alloc_workqueue("obd_zombid", 0, 0);
2010 * stop destroy zombie import/export thread
2012 void obd_zombie_impexp_stop(void)
2014 destroy_workqueue(zombie_wq);
2015 LASSERT(list_empty(&obd_stale_exports));
2018 /***** Kernel-userspace comm helpers *******/
2020 /* Get length of entire message, including header */
2021 int kuc_len(int payload_len)
2023 return sizeof(struct kuc_hdr) + payload_len;
2025 EXPORT_SYMBOL(kuc_len);
2027 /* Get a pointer to kuc header, given a ptr to the payload
2028 * @param p Pointer to payload area
2029 * @returns Pointer to kuc header
2031 struct kuc_hdr * kuc_ptr(void *p)
2033 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2034 LASSERT(lh->kuc_magic == KUC_MAGIC);
2037 EXPORT_SYMBOL(kuc_ptr);
2039 /* Alloc space for a message, and fill in header
2040 * @return Pointer to payload area
2042 void *kuc_alloc(int payload_len, int transport, int type)
2045 int len = kuc_len(payload_len);
2049 return ERR_PTR(-ENOMEM);
2051 lh->kuc_magic = KUC_MAGIC;
2052 lh->kuc_transport = transport;
2053 lh->kuc_msgtype = type;
2054 lh->kuc_msglen = len;
2056 return (void *)(lh + 1);
2058 EXPORT_SYMBOL(kuc_alloc);
2060 /* Takes pointer to payload area */
2061 void kuc_free(void *p, int payload_len)
2063 struct kuc_hdr *lh = kuc_ptr(p);
2064 OBD_FREE(lh, kuc_len(payload_len));
2066 EXPORT_SYMBOL(kuc_free);
2068 struct obd_request_slot_waiter {
2069 struct list_head orsw_entry;
2070 wait_queue_head_t orsw_waitq;
2074 static bool obd_request_slot_avail(struct client_obd *cli,
2075 struct obd_request_slot_waiter *orsw)
2079 spin_lock(&cli->cl_loi_list_lock);
2080 avail = !!list_empty(&orsw->orsw_entry);
2081 spin_unlock(&cli->cl_loi_list_lock);
2087 * For network flow control, the RPC sponsor needs to acquire a credit
2088 * before sending the RPC. The credits count for a connection is defined
2089 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2090 * the subsequent RPC sponsors need to wait until others released their
2091 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2093 int obd_get_request_slot(struct client_obd *cli)
2095 struct obd_request_slot_waiter orsw;
2096 struct l_wait_info lwi;
2099 spin_lock(&cli->cl_loi_list_lock);
2100 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2101 cli->cl_rpcs_in_flight++;
2102 spin_unlock(&cli->cl_loi_list_lock);
2106 init_waitqueue_head(&orsw.orsw_waitq);
2107 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2108 orsw.orsw_signaled = false;
2109 spin_unlock(&cli->cl_loi_list_lock);
2111 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2112 rc = l_wait_event(orsw.orsw_waitq,
2113 obd_request_slot_avail(cli, &orsw) ||
2117 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2118 * freed but other (such as obd_put_request_slot) is using it. */
2119 spin_lock(&cli->cl_loi_list_lock);
2121 if (!orsw.orsw_signaled) {
2122 if (list_empty(&orsw.orsw_entry))
2123 cli->cl_rpcs_in_flight--;
2125 list_del(&orsw.orsw_entry);
2129 if (orsw.orsw_signaled) {
2130 LASSERT(list_empty(&orsw.orsw_entry));
2134 spin_unlock(&cli->cl_loi_list_lock);
2138 EXPORT_SYMBOL(obd_get_request_slot);
2140 void obd_put_request_slot(struct client_obd *cli)
2142 struct obd_request_slot_waiter *orsw;
2144 spin_lock(&cli->cl_loi_list_lock);
2145 cli->cl_rpcs_in_flight--;
2147 /* If there is free slot, wakeup the first waiter. */
2148 if (!list_empty(&cli->cl_flight_waiters) &&
2149 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2150 orsw = list_entry(cli->cl_flight_waiters.next,
2151 struct obd_request_slot_waiter, orsw_entry);
2152 list_del_init(&orsw->orsw_entry);
2153 cli->cl_rpcs_in_flight++;
2154 wake_up(&orsw->orsw_waitq);
2156 spin_unlock(&cli->cl_loi_list_lock);
2158 EXPORT_SYMBOL(obd_put_request_slot);
2160 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2162 return cli->cl_max_rpcs_in_flight;
2164 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2166 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2168 struct obd_request_slot_waiter *orsw;
2175 if (max > OBD_MAX_RIF_MAX || max < 1)
2178 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2179 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2180 /* adjust max_mod_rpcs_in_flight to ensure it is always
2181 * strictly lower that max_rpcs_in_flight */
2183 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2184 "because it must be higher than "
2185 "max_mod_rpcs_in_flight value",
2186 cli->cl_import->imp_obd->obd_name);
2189 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2190 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2196 spin_lock(&cli->cl_loi_list_lock);
2197 old = cli->cl_max_rpcs_in_flight;
2198 cli->cl_max_rpcs_in_flight = max;
2199 client_adjust_max_dirty(cli);
2203 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2204 for (i = 0; i < diff; i++) {
2205 if (list_empty(&cli->cl_flight_waiters))
2208 orsw = list_entry(cli->cl_flight_waiters.next,
2209 struct obd_request_slot_waiter, orsw_entry);
2210 list_del_init(&orsw->orsw_entry);
2211 cli->cl_rpcs_in_flight++;
2212 wake_up(&orsw->orsw_waitq);
2214 spin_unlock(&cli->cl_loi_list_lock);
2218 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2220 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2222 return cli->cl_max_mod_rpcs_in_flight;
2224 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2226 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2228 struct obd_connect_data *ocd;
2232 if (max > OBD_MAX_RIF_MAX || max < 1)
2235 /* cannot exceed or equal max_rpcs_in_flight */
2236 if (max >= cli->cl_max_rpcs_in_flight) {
2237 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2238 "higher or equal to max_rpcs_in_flight value (%u)\n",
2239 cli->cl_import->imp_obd->obd_name,
2240 max, cli->cl_max_rpcs_in_flight);
2244 /* cannot exceed max modify RPCs in flight supported by the server */
2245 ocd = &cli->cl_import->imp_connect_data;
2246 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2247 maxmodrpcs = ocd->ocd_maxmodrpcs;
2250 if (max > maxmodrpcs) {
2251 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2252 "higher than max_mod_rpcs_per_client value (%hu) "
2253 "returned by the server at connection\n",
2254 cli->cl_import->imp_obd->obd_name,
2259 spin_lock(&cli->cl_mod_rpcs_lock);
2261 prev = cli->cl_max_mod_rpcs_in_flight;
2262 cli->cl_max_mod_rpcs_in_flight = max;
2264 /* wakeup waiters if limit has been increased */
2265 if (cli->cl_max_mod_rpcs_in_flight > prev)
2266 wake_up(&cli->cl_mod_rpcs_waitq);
2268 spin_unlock(&cli->cl_mod_rpcs_lock);
2272 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2275 #define pct(a, b) (b ? a * 100 / b : 0)
2276 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2277 struct seq_file *seq)
2279 unsigned long mod_tot = 0, mod_cum;
2280 struct timespec64 now;
2283 ktime_get_real_ts64(&now);
2285 spin_lock(&cli->cl_mod_rpcs_lock);
2287 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2288 (s64)now.tv_sec, now.tv_nsec);
2289 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2290 cli->cl_mod_rpcs_in_flight);
2292 seq_printf(seq, "\n\t\t\tmodify\n");
2293 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2295 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2298 for (i = 0; i < OBD_HIST_MAX; i++) {
2299 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2301 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2302 i, mod, pct(mod, mod_tot),
2303 pct(mod_cum, mod_tot));
2304 if (mod_cum == mod_tot)
2308 spin_unlock(&cli->cl_mod_rpcs_lock);
2312 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2316 /* The number of modify RPCs sent in parallel is limited
2317 * because the server has a finite number of slots per client to
2318 * store request result and ensure reply reconstruction when needed.
2319 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2320 * that takes into account server limit and cl_max_rpcs_in_flight
2322 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2323 * one close request is allowed above the maximum.
2325 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2330 /* A slot is available if
2331 * - number of modify RPCs in flight is less than the max
2332 * - it's a close RPC and no other close request is in flight
2334 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2335 (close_req && cli->cl_close_rpcs_in_flight == 0);
2340 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2345 spin_lock(&cli->cl_mod_rpcs_lock);
2346 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2347 spin_unlock(&cli->cl_mod_rpcs_lock);
2351 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2354 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2355 it->it_op == IT_READDIR ||
2356 (it->it_op == IT_LAYOUT && !(it->it_flags & MDS_FMODE_WRITE))))
2361 /* Get a modify RPC slot from the obd client @cli according
2362 * to the kind of operation @opc that is going to be sent
2363 * and the intent @it of the operation if it applies.
2364 * If the maximum number of modify RPCs in flight is reached
2365 * the thread is put to sleep.
2366 * Returns the tag to be set in the request message. Tag 0
2367 * is reserved for non-modifying requests.
2369 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2370 struct lookup_intent *it)
2372 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2373 bool close_req = false;
2376 /* read-only metadata RPCs don't consume a slot on MDT
2377 * for reply reconstruction
2379 if (obd_skip_mod_rpc_slot(it))
2382 if (opc == MDS_CLOSE)
2386 spin_lock(&cli->cl_mod_rpcs_lock);
2387 max = cli->cl_max_mod_rpcs_in_flight;
2388 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2389 /* there is a slot available */
2390 cli->cl_mod_rpcs_in_flight++;
2392 cli->cl_close_rpcs_in_flight++;
2393 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2394 cli->cl_mod_rpcs_in_flight);
2395 /* find a free tag */
2396 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2398 LASSERT(i < OBD_MAX_RIF_MAX);
2399 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2400 spin_unlock(&cli->cl_mod_rpcs_lock);
2401 /* tag 0 is reserved for non-modify RPCs */
2404 spin_unlock(&cli->cl_mod_rpcs_lock);
2406 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2407 "opc %u, max %hu\n",
2408 cli->cl_import->imp_obd->obd_name, opc, max);
2410 l_wait_event(cli->cl_mod_rpcs_waitq,
2411 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2414 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2416 /* Put a modify RPC slot from the obd client @cli according
2417 * to the kind of operation @opc that has been sent and the
2418 * intent @it of the operation if it applies.
2420 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2421 struct lookup_intent *it, __u16 tag)
2423 bool close_req = false;
2425 if (obd_skip_mod_rpc_slot(it))
2428 if (opc == MDS_CLOSE)
2431 spin_lock(&cli->cl_mod_rpcs_lock);
2432 cli->cl_mod_rpcs_in_flight--;
2434 cli->cl_close_rpcs_in_flight--;
2435 /* release the tag in the bitmap */
2436 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2437 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2438 spin_unlock(&cli->cl_mod_rpcs_lock);
2439 wake_up(&cli->cl_mod_rpcs_waitq);
2441 EXPORT_SYMBOL(obd_put_mod_rpc_slot);