4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
48 static DEFINE_SPINLOCK(obd_types_lock);
49 static LIST_HEAD(obd_types);
50 DEFINE_RWLOCK(obd_dev_lock);
51 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
53 static struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 static struct kmem_cache *import_cachep;
58 static LIST_HEAD(obd_zombie_imports);
59 static LIST_HEAD(obd_zombie_exports);
60 static DEFINE_SPINLOCK(obd_zombie_impexp_lock);
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp,
66 const char *status, int locks, int debug_level);
68 static LIST_HEAD(obd_stale_exports);
69 static DEFINE_SPINLOCK(obd_stale_export_lock);
70 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
72 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
73 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
76 * support functions: we could use inter-module communication, but this
77 * is more portable to other OS's
79 static struct obd_device *obd_device_alloc(void)
81 struct obd_device *obd;
83 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
85 obd->obd_magic = OBD_DEVICE_MAGIC;
90 static void obd_device_free(struct obd_device *obd)
93 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
94 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
95 if (obd->obd_namespace != NULL) {
96 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
97 obd, obd->obd_namespace, obd->obd_force);
100 lu_ref_fini(&obd->obd_reference);
101 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
104 struct obd_type *class_search_type(const char *name)
106 struct list_head *tmp;
107 struct obd_type *type;
109 spin_lock(&obd_types_lock);
110 list_for_each(tmp, &obd_types) {
111 type = list_entry(tmp, struct obd_type, typ_chain);
112 if (strcmp(type->typ_name, name) == 0) {
113 spin_unlock(&obd_types_lock);
117 spin_unlock(&obd_types_lock);
120 EXPORT_SYMBOL(class_search_type);
122 struct obd_type *class_get_type(const char *name)
124 struct obd_type *type = class_search_type(name);
126 #ifdef HAVE_MODULE_LOADING_SUPPORT
128 const char *modname = name;
130 if (strcmp(modname, "obdfilter") == 0)
133 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
134 modname = LUSTRE_OSP_NAME;
136 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
137 modname = LUSTRE_MDT_NAME;
139 if (!request_module("%s", modname)) {
140 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
141 type = class_search_type(name);
143 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
149 spin_lock(&type->obd_type_lock);
151 try_module_get(type->typ_dt_ops->o_owner);
152 spin_unlock(&type->obd_type_lock);
157 void class_put_type(struct obd_type *type)
160 spin_lock(&type->obd_type_lock);
162 module_put(type->typ_dt_ops->o_owner);
163 spin_unlock(&type->obd_type_lock);
166 static void class_sysfs_release(struct kobject *kobj)
168 struct obd_type *type = container_of(kobj, struct obd_type,
171 complete(&type->typ_kobj_unregister);
174 static struct kobj_type class_ktype = {
175 .sysfs_ops = &lustre_sysfs_ops,
176 .release = class_sysfs_release,
179 #define CLASS_MAX_NAME 1024
181 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
182 bool enable_proc, struct lprocfs_vars *vars,
183 const char *name, struct lu_device_type *ldt)
185 struct obd_type *type;
190 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
192 if (class_search_type(name)) {
193 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
198 OBD_ALLOC(type, sizeof(*type));
202 OBD_ALLOC_PTR(type->typ_dt_ops);
203 OBD_ALLOC_PTR(type->typ_md_ops);
204 OBD_ALLOC(type->typ_name, strlen(name) + 1);
206 if (type->typ_dt_ops == NULL ||
207 type->typ_md_ops == NULL ||
208 type->typ_name == NULL)
211 *(type->typ_dt_ops) = *dt_ops;
212 /* md_ops is optional */
214 *(type->typ_md_ops) = *md_ops;
215 strcpy(type->typ_name, name);
216 spin_lock_init(&type->obd_type_lock);
218 #ifdef CONFIG_PROC_FS
220 type->typ_procroot = lprocfs_register(type->typ_name,
223 if (IS_ERR(type->typ_procroot)) {
224 rc = PTR_ERR(type->typ_procroot);
225 type->typ_procroot = NULL;
230 type->typ_kobj.kset = lustre_kset;
231 init_completion(&type->typ_kobj_unregister);
232 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
233 &lustre_kset->kobj, "%s", type->typ_name);
239 rc = lu_device_type_init(ldt);
241 kobject_put(&type->typ_kobj);
246 spin_lock(&obd_types_lock);
247 list_add(&type->typ_chain, &obd_types);
248 spin_unlock(&obd_types_lock);
253 if (type->typ_name != NULL) {
254 #ifdef CONFIG_PROC_FS
255 if (type->typ_procroot != NULL)
256 remove_proc_subtree(type->typ_name, proc_lustre_root);
258 OBD_FREE(type->typ_name, strlen(name) + 1);
260 if (type->typ_md_ops != NULL)
261 OBD_FREE_PTR(type->typ_md_ops);
262 if (type->typ_dt_ops != NULL)
263 OBD_FREE_PTR(type->typ_dt_ops);
264 OBD_FREE(type, sizeof(*type));
267 EXPORT_SYMBOL(class_register_type);
269 int class_unregister_type(const char *name)
271 struct obd_type *type = class_search_type(name);
275 CERROR("unknown obd type\n");
279 if (type->typ_refcnt) {
280 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
281 /* This is a bad situation, let's make the best of it */
282 /* Remove ops, but leave the name for debugging */
283 OBD_FREE_PTR(type->typ_dt_ops);
284 OBD_FREE_PTR(type->typ_md_ops);
288 kobject_put(&type->typ_kobj);
289 wait_for_completion(&type->typ_kobj_unregister);
291 /* we do not use type->typ_procroot as for compatibility purposes
292 * other modules can share names (i.e. lod can use lov entry). so
293 * we can't reference pointer as it can get invalided when another
294 * module removes the entry */
295 #ifdef CONFIG_PROC_FS
296 if (type->typ_procroot != NULL)
297 remove_proc_subtree(type->typ_name, proc_lustre_root);
298 if (type->typ_procsym != NULL)
299 lprocfs_remove(&type->typ_procsym);
302 lu_device_type_fini(type->typ_lu);
304 spin_lock(&obd_types_lock);
305 list_del(&type->typ_chain);
306 spin_unlock(&obd_types_lock);
307 OBD_FREE(type->typ_name, strlen(name) + 1);
308 if (type->typ_dt_ops != NULL)
309 OBD_FREE_PTR(type->typ_dt_ops);
310 if (type->typ_md_ops != NULL)
311 OBD_FREE_PTR(type->typ_md_ops);
312 OBD_FREE(type, sizeof(*type));
314 } /* class_unregister_type */
315 EXPORT_SYMBOL(class_unregister_type);
318 * Create a new obd device.
320 * Allocate the new obd_device and initialize it.
322 * \param[in] type_name obd device type string.
323 * \param[in] name obd device name.
324 * \param[in] uuid obd device UUID
326 * \retval newdev pointer to created obd_device
327 * \retval ERR_PTR(errno) on error
329 struct obd_device *class_newdev(const char *type_name, const char *name,
332 struct obd_device *newdev;
333 struct obd_type *type = NULL;
336 if (strlen(name) >= MAX_OBD_NAME) {
337 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
338 RETURN(ERR_PTR(-EINVAL));
341 type = class_get_type(type_name);
343 CERROR("OBD: unknown type: %s\n", type_name);
344 RETURN(ERR_PTR(-ENODEV));
347 newdev = obd_device_alloc();
348 if (newdev == NULL) {
349 class_put_type(type);
350 RETURN(ERR_PTR(-ENOMEM));
352 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
353 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
354 newdev->obd_type = type;
355 newdev->obd_minor = -1;
357 rwlock_init(&newdev->obd_pool_lock);
358 newdev->obd_pool_limit = 0;
359 newdev->obd_pool_slv = 0;
361 INIT_LIST_HEAD(&newdev->obd_exports);
362 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
363 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
364 INIT_LIST_HEAD(&newdev->obd_exports_timed);
365 INIT_LIST_HEAD(&newdev->obd_nid_stats);
366 spin_lock_init(&newdev->obd_nid_lock);
367 spin_lock_init(&newdev->obd_dev_lock);
368 mutex_init(&newdev->obd_dev_mutex);
369 spin_lock_init(&newdev->obd_osfs_lock);
370 /* newdev->obd_osfs_age must be set to a value in the distant
371 * past to guarantee a fresh statfs is fetched on mount. */
372 newdev->obd_osfs_age = cfs_time_shift_64(-1000);
374 /* XXX belongs in setup not attach */
375 init_rwsem(&newdev->obd_observer_link_sem);
377 init_timer(&newdev->obd_recovery_timer);
378 spin_lock_init(&newdev->obd_recovery_task_lock);
379 init_waitqueue_head(&newdev->obd_next_transno_waitq);
380 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
381 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
382 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
383 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
384 INIT_LIST_HEAD(&newdev->obd_evict_list);
385 INIT_LIST_HEAD(&newdev->obd_lwp_list);
387 llog_group_init(&newdev->obd_olg);
388 /* Detach drops this */
389 atomic_set(&newdev->obd_refcount, 1);
390 lu_ref_init(&newdev->obd_reference);
391 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
393 newdev->obd_conn_inprogress = 0;
395 strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
397 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
398 newdev->obd_name, newdev);
406 * \param[in] obd obd_device to be freed
410 void class_free_dev(struct obd_device *obd)
412 struct obd_type *obd_type = obd->obd_type;
414 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
415 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
416 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
417 "obd %p != obd_devs[%d] %p\n",
418 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
419 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
420 "obd_refcount should be 0, not %d\n",
421 atomic_read(&obd->obd_refcount));
422 LASSERT(obd_type != NULL);
424 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
425 obd->obd_name, obd->obd_type->typ_name);
427 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
428 obd->obd_name, obd->obd_uuid.uuid);
429 if (obd->obd_stopping) {
432 /* If we're not stopping, we were never set up */
433 err = obd_cleanup(obd);
435 CERROR("Cleanup %s returned %d\n",
439 obd_device_free(obd);
441 class_put_type(obd_type);
445 * Unregister obd device.
447 * Free slot in obd_dev[] used by \a obd.
449 * \param[in] new_obd obd_device to be unregistered
453 void class_unregister_device(struct obd_device *obd)
455 write_lock(&obd_dev_lock);
456 if (obd->obd_minor >= 0) {
457 LASSERT(obd_devs[obd->obd_minor] == obd);
458 obd_devs[obd->obd_minor] = NULL;
461 write_unlock(&obd_dev_lock);
465 * Register obd device.
467 * Find free slot in obd_devs[], fills it with \a new_obd.
469 * \param[in] new_obd obd_device to be registered
472 * \retval -EEXIST device with this name is registered
473 * \retval -EOVERFLOW obd_devs[] is full
475 int class_register_device(struct obd_device *new_obd)
479 int new_obd_minor = 0;
480 bool minor_assign = false;
482 write_lock(&obd_dev_lock);
483 for (i = 0; i < class_devno_max(); i++) {
484 struct obd_device *obd = class_num2obd(i);
487 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
488 CERROR("%s: already exists, won't add\n",
490 /* in case we found a free slot before duplicate */
491 minor_assign = false;
495 if (!minor_assign && obd == NULL) {
502 new_obd->obd_minor = new_obd_minor;
503 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
504 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
505 obd_devs[new_obd_minor] = new_obd;
509 CERROR("%s: all %u/%u devices used, increase "
510 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
511 i, class_devno_max(), ret);
514 write_unlock(&obd_dev_lock);
519 static int class_name2dev_nolock(const char *name)
526 for (i = 0; i < class_devno_max(); i++) {
527 struct obd_device *obd = class_num2obd(i);
529 if (obd && strcmp(name, obd->obd_name) == 0) {
530 /* Make sure we finished attaching before we give
531 out any references */
532 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
533 if (obd->obd_attached) {
543 int class_name2dev(const char *name)
550 read_lock(&obd_dev_lock);
551 i = class_name2dev_nolock(name);
552 read_unlock(&obd_dev_lock);
556 EXPORT_SYMBOL(class_name2dev);
558 struct obd_device *class_name2obd(const char *name)
560 int dev = class_name2dev(name);
562 if (dev < 0 || dev > class_devno_max())
564 return class_num2obd(dev);
566 EXPORT_SYMBOL(class_name2obd);
568 int class_uuid2dev_nolock(struct obd_uuid *uuid)
572 for (i = 0; i < class_devno_max(); i++) {
573 struct obd_device *obd = class_num2obd(i);
575 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
576 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
584 int class_uuid2dev(struct obd_uuid *uuid)
588 read_lock(&obd_dev_lock);
589 i = class_uuid2dev_nolock(uuid);
590 read_unlock(&obd_dev_lock);
594 EXPORT_SYMBOL(class_uuid2dev);
596 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
598 int dev = class_uuid2dev(uuid);
601 return class_num2obd(dev);
603 EXPORT_SYMBOL(class_uuid2obd);
606 * Get obd device from ::obd_devs[]
608 * \param num [in] array index
610 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
611 * otherwise return the obd device there.
613 struct obd_device *class_num2obd(int num)
615 struct obd_device *obd = NULL;
617 if (num < class_devno_max()) {
622 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
623 "%p obd_magic %08x != %08x\n",
624 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
625 LASSERTF(obd->obd_minor == num,
626 "%p obd_minor %0d != %0d\n",
627 obd, obd->obd_minor, num);
634 * Find obd in obd_dev[] by name or uuid.
636 * Increment obd's refcount if found.
638 * \param[in] str obd name or uuid
640 * \retval NULL if not found
641 * \retval target pointer to found obd_device
643 struct obd_device *class_dev_by_str(const char *str)
645 struct obd_device *target = NULL;
646 struct obd_uuid tgtuuid;
649 obd_str2uuid(&tgtuuid, str);
651 read_lock(&obd_dev_lock);
652 rc = class_uuid2dev_nolock(&tgtuuid);
654 rc = class_name2dev_nolock(str);
657 target = class_num2obd(rc);
660 class_incref(target, "find", current);
661 read_unlock(&obd_dev_lock);
665 EXPORT_SYMBOL(class_dev_by_str);
668 * Get obd devices count. Device in any
670 * \retval obd device count
672 int get_devices_count(void)
674 int index, max_index = class_devno_max(), dev_count = 0;
676 read_lock(&obd_dev_lock);
677 for (index = 0; index <= max_index; index++) {
678 struct obd_device *obd = class_num2obd(index);
682 read_unlock(&obd_dev_lock);
686 EXPORT_SYMBOL(get_devices_count);
688 void class_obd_list(void)
693 read_lock(&obd_dev_lock);
694 for (i = 0; i < class_devno_max(); i++) {
695 struct obd_device *obd = class_num2obd(i);
699 if (obd->obd_stopping)
701 else if (obd->obd_set_up)
703 else if (obd->obd_attached)
707 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
708 i, status, obd->obd_type->typ_name,
709 obd->obd_name, obd->obd_uuid.uuid,
710 atomic_read(&obd->obd_refcount));
712 read_unlock(&obd_dev_lock);
716 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
717 specified, then only the client with that uuid is returned,
718 otherwise any client connected to the tgt is returned. */
719 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
720 const char * typ_name,
721 struct obd_uuid *grp_uuid)
725 read_lock(&obd_dev_lock);
726 for (i = 0; i < class_devno_max(); i++) {
727 struct obd_device *obd = class_num2obd(i);
731 if ((strncmp(obd->obd_type->typ_name, typ_name,
732 strlen(typ_name)) == 0)) {
733 if (obd_uuid_equals(tgt_uuid,
734 &obd->u.cli.cl_target_uuid) &&
735 ((grp_uuid)? obd_uuid_equals(grp_uuid,
736 &obd->obd_uuid) : 1)) {
737 read_unlock(&obd_dev_lock);
742 read_unlock(&obd_dev_lock);
746 EXPORT_SYMBOL(class_find_client_obd);
748 /* Iterate the obd_device list looking devices have grp_uuid. Start
749 searching at *next, and if a device is found, the next index to look
750 at is saved in *next. If next is NULL, then the first matching device
751 will always be returned. */
752 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
758 else if (*next >= 0 && *next < class_devno_max())
763 read_lock(&obd_dev_lock);
764 for (; i < class_devno_max(); i++) {
765 struct obd_device *obd = class_num2obd(i);
769 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
772 read_unlock(&obd_dev_lock);
776 read_unlock(&obd_dev_lock);
780 EXPORT_SYMBOL(class_devices_in_group);
783 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
784 * adjust sptlrpc settings accordingly.
786 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
788 struct obd_device *obd;
792 LASSERT(namelen > 0);
794 read_lock(&obd_dev_lock);
795 for (i = 0; i < class_devno_max(); i++) {
796 obd = class_num2obd(i);
798 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
801 /* only notify mdc, osc, osp, lwp, mdt, ost
802 * because only these have a -sptlrpc llog */
803 type = obd->obd_type->typ_name;
804 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
805 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
806 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
807 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
808 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
809 strcmp(type, LUSTRE_OST_NAME) != 0)
812 if (strncmp(obd->obd_name, fsname, namelen))
815 class_incref(obd, __FUNCTION__, obd);
816 read_unlock(&obd_dev_lock);
817 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
818 sizeof(KEY_SPTLRPC_CONF),
819 KEY_SPTLRPC_CONF, 0, NULL, NULL);
821 class_decref(obd, __FUNCTION__, obd);
822 read_lock(&obd_dev_lock);
824 read_unlock(&obd_dev_lock);
827 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
829 void obd_cleanup_caches(void)
832 if (obd_device_cachep) {
833 kmem_cache_destroy(obd_device_cachep);
834 obd_device_cachep = NULL;
837 kmem_cache_destroy(obdo_cachep);
841 kmem_cache_destroy(import_cachep);
842 import_cachep = NULL;
848 int obd_init_caches(void)
853 LASSERT(obd_device_cachep == NULL);
854 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
855 sizeof(struct obd_device),
857 if (!obd_device_cachep)
858 GOTO(out, rc = -ENOMEM);
860 LASSERT(obdo_cachep == NULL);
861 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
864 GOTO(out, rc = -ENOMEM);
866 LASSERT(import_cachep == NULL);
867 import_cachep = kmem_cache_create("ll_import_cache",
868 sizeof(struct obd_import),
871 GOTO(out, rc = -ENOMEM);
875 obd_cleanup_caches();
879 /* map connection to client */
880 struct obd_export *class_conn2export(struct lustre_handle *conn)
882 struct obd_export *export;
886 CDEBUG(D_CACHE, "looking for null handle\n");
890 if (conn->cookie == -1) { /* this means assign a new connection */
891 CDEBUG(D_CACHE, "want a new connection\n");
895 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
896 export = class_handle2object(conn->cookie, NULL);
899 EXPORT_SYMBOL(class_conn2export);
901 struct obd_device *class_exp2obd(struct obd_export *exp)
907 EXPORT_SYMBOL(class_exp2obd);
909 struct obd_device *class_conn2obd(struct lustre_handle *conn)
911 struct obd_export *export;
912 export = class_conn2export(conn);
914 struct obd_device *obd = export->exp_obd;
915 class_export_put(export);
921 struct obd_import *class_exp2cliimp(struct obd_export *exp)
923 struct obd_device *obd = exp->exp_obd;
926 return obd->u.cli.cl_import;
928 EXPORT_SYMBOL(class_exp2cliimp);
930 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
932 struct obd_device *obd = class_conn2obd(conn);
935 return obd->u.cli.cl_import;
938 /* Export management functions */
939 static void class_export_destroy(struct obd_export *exp)
941 struct obd_device *obd = exp->exp_obd;
944 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
945 LASSERT(obd != NULL);
947 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
948 exp->exp_client_uuid.uuid, obd->obd_name);
950 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
951 if (exp->exp_connection)
952 ptlrpc_put_connection_superhack(exp->exp_connection);
954 LASSERT(list_empty(&exp->exp_outstanding_replies));
955 LASSERT(list_empty(&exp->exp_uncommitted_replies));
956 LASSERT(list_empty(&exp->exp_req_replay_queue));
957 LASSERT(list_empty(&exp->exp_hp_rpcs));
958 obd_destroy_export(exp);
959 /* self export doesn't hold a reference to an obd, although it
960 * exists until freeing of the obd */
961 if (exp != obd->obd_self_export)
962 class_decref(obd, "export", exp);
964 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
968 static void export_handle_addref(void *export)
970 class_export_get(export);
973 static struct portals_handle_ops export_handle_ops = {
974 .hop_addref = export_handle_addref,
978 struct obd_export *class_export_get(struct obd_export *exp)
980 atomic_inc(&exp->exp_refcount);
981 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
982 atomic_read(&exp->exp_refcount));
985 EXPORT_SYMBOL(class_export_get);
987 void class_export_put(struct obd_export *exp)
989 LASSERT(exp != NULL);
990 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
991 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
992 atomic_read(&exp->exp_refcount) - 1);
994 if (atomic_dec_and_test(&exp->exp_refcount)) {
995 struct obd_device *obd = exp->exp_obd;
997 CDEBUG(D_IOCTL, "final put %p/%s\n",
998 exp, exp->exp_client_uuid.uuid);
1000 /* release nid stat refererence */
1001 lprocfs_exp_cleanup(exp);
1003 if (exp == obd->obd_self_export) {
1004 /* self export should be destroyed without
1005 * zombie thread as it doesn't hold a
1006 * reference to obd and doesn't hold any
1008 class_export_destroy(exp);
1009 /* self export is destroyed, no class
1010 * references exist and it is safe to free
1012 class_free_dev(obd);
1014 LASSERT(!list_empty(&exp->exp_obd_chain));
1015 obd_zombie_export_add(exp);
1020 EXPORT_SYMBOL(class_export_put);
1021 /* Creates a new export, adds it to the hash table, and returns a
1022 * pointer to it. The refcount is 2: one for the hash reference, and
1023 * one for the pointer returned by this function. */
1024 struct obd_export *__class_new_export(struct obd_device *obd,
1025 struct obd_uuid *cluuid, bool is_self)
1027 struct obd_export *export;
1028 struct cfs_hash *hash = NULL;
1032 OBD_ALLOC_PTR(export);
1034 return ERR_PTR(-ENOMEM);
1036 export->exp_conn_cnt = 0;
1037 export->exp_lock_hash = NULL;
1038 export->exp_flock_hash = NULL;
1039 /* 2 = class_handle_hash + last */
1040 atomic_set(&export->exp_refcount, 2);
1041 atomic_set(&export->exp_rpc_count, 0);
1042 atomic_set(&export->exp_cb_count, 0);
1043 atomic_set(&export->exp_locks_count, 0);
1044 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1045 INIT_LIST_HEAD(&export->exp_locks_list);
1046 spin_lock_init(&export->exp_locks_list_guard);
1048 atomic_set(&export->exp_replay_count, 0);
1049 export->exp_obd = obd;
1050 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1051 spin_lock_init(&export->exp_uncommitted_replies_lock);
1052 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1053 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1054 INIT_LIST_HEAD(&export->exp_handle.h_link);
1055 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1056 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1057 class_handle_hash(&export->exp_handle, &export_handle_ops);
1058 export->exp_last_request_time = ktime_get_real_seconds();
1059 spin_lock_init(&export->exp_lock);
1060 spin_lock_init(&export->exp_rpc_lock);
1061 INIT_HLIST_NODE(&export->exp_uuid_hash);
1062 INIT_HLIST_NODE(&export->exp_nid_hash);
1063 INIT_HLIST_NODE(&export->exp_gen_hash);
1064 spin_lock_init(&export->exp_bl_list_lock);
1065 INIT_LIST_HEAD(&export->exp_bl_list);
1066 INIT_LIST_HEAD(&export->exp_stale_list);
1068 export->exp_sp_peer = LUSTRE_SP_ANY;
1069 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1070 export->exp_client_uuid = *cluuid;
1071 obd_init_export(export);
1073 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1074 spin_lock(&obd->obd_dev_lock);
1075 /* shouldn't happen, but might race */
1076 if (obd->obd_stopping)
1077 GOTO(exit_unlock, rc = -ENODEV);
1079 hash = cfs_hash_getref(obd->obd_uuid_hash);
1081 GOTO(exit_unlock, rc = -ENODEV);
1082 spin_unlock(&obd->obd_dev_lock);
1084 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1086 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1087 obd->obd_name, cluuid->uuid, rc);
1088 GOTO(exit_err, rc = -EALREADY);
1092 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1093 spin_lock(&obd->obd_dev_lock);
1094 if (obd->obd_stopping) {
1096 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1097 GOTO(exit_unlock, rc = -ESHUTDOWN);
1101 class_incref(obd, "export", export);
1102 list_add_tail(&export->exp_obd_chain_timed,
1103 &obd->obd_exports_timed);
1104 list_add(&export->exp_obd_chain, &obd->obd_exports);
1105 obd->obd_num_exports++;
1107 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1108 INIT_LIST_HEAD(&export->exp_obd_chain);
1110 spin_unlock(&obd->obd_dev_lock);
1112 cfs_hash_putref(hash);
1116 spin_unlock(&obd->obd_dev_lock);
1119 cfs_hash_putref(hash);
1120 class_handle_unhash(&export->exp_handle);
1121 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1122 obd_destroy_export(export);
1123 OBD_FREE_PTR(export);
1127 struct obd_export *class_new_export(struct obd_device *obd,
1128 struct obd_uuid *uuid)
1130 return __class_new_export(obd, uuid, false);
1132 EXPORT_SYMBOL(class_new_export);
1134 struct obd_export *class_new_export_self(struct obd_device *obd,
1135 struct obd_uuid *uuid)
1137 return __class_new_export(obd, uuid, true);
1140 void class_unlink_export(struct obd_export *exp)
1142 class_handle_unhash(&exp->exp_handle);
1144 if (exp->exp_obd->obd_self_export == exp) {
1145 class_export_put(exp);
1149 spin_lock(&exp->exp_obd->obd_dev_lock);
1150 /* delete an uuid-export hashitem from hashtables */
1151 if (!hlist_unhashed(&exp->exp_uuid_hash))
1152 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1153 &exp->exp_client_uuid,
1154 &exp->exp_uuid_hash);
1156 #ifdef HAVE_SERVER_SUPPORT
1157 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1158 struct tg_export_data *ted = &exp->exp_target_data;
1159 struct cfs_hash *hash;
1161 /* Because obd_gen_hash will not be released until
1162 * class_cleanup(), so hash should never be NULL here */
1163 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1164 LASSERT(hash != NULL);
1165 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1166 &exp->exp_gen_hash);
1167 cfs_hash_putref(hash);
1169 #endif /* HAVE_SERVER_SUPPORT */
1171 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1172 list_del_init(&exp->exp_obd_chain_timed);
1173 exp->exp_obd->obd_num_exports--;
1174 spin_unlock(&exp->exp_obd->obd_dev_lock);
1175 atomic_inc(&obd_stale_export_num);
1177 /* A reference is kept by obd_stale_exports list */
1178 obd_stale_export_put(exp);
1180 EXPORT_SYMBOL(class_unlink_export);
1182 /* Import management functions */
1183 static void class_import_destroy(struct obd_import *imp)
1187 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1188 imp->imp_obd->obd_name);
1190 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1192 ptlrpc_put_connection_superhack(imp->imp_connection);
1194 while (!list_empty(&imp->imp_conn_list)) {
1195 struct obd_import_conn *imp_conn;
1197 imp_conn = list_entry(imp->imp_conn_list.next,
1198 struct obd_import_conn, oic_item);
1199 list_del_init(&imp_conn->oic_item);
1200 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1201 OBD_FREE(imp_conn, sizeof(*imp_conn));
1204 LASSERT(imp->imp_sec == NULL);
1205 class_decref(imp->imp_obd, "import", imp);
1206 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1210 static void import_handle_addref(void *import)
1212 class_import_get(import);
1215 static struct portals_handle_ops import_handle_ops = {
1216 .hop_addref = import_handle_addref,
1220 struct obd_import *class_import_get(struct obd_import *import)
1222 atomic_inc(&import->imp_refcount);
1223 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1224 atomic_read(&import->imp_refcount),
1225 import->imp_obd->obd_name);
1228 EXPORT_SYMBOL(class_import_get);
1230 void class_import_put(struct obd_import *imp)
1234 LASSERT(list_empty(&imp->imp_zombie_chain));
1235 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1237 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1238 atomic_read(&imp->imp_refcount) - 1,
1239 imp->imp_obd->obd_name);
1241 if (atomic_dec_and_test(&imp->imp_refcount)) {
1242 CDEBUG(D_INFO, "final put import %p\n", imp);
1243 obd_zombie_import_add(imp);
1246 /* catch possible import put race */
1247 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1250 EXPORT_SYMBOL(class_import_put);
1252 static void init_imp_at(struct imp_at *at) {
1254 at_init(&at->iat_net_latency, 0, 0);
1255 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1256 /* max service estimates are tracked on the server side, so
1257 don't use the AT history here, just use the last reported
1258 val. (But keep hist for proc histogram, worst_ever) */
1259 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1264 struct obd_import *class_new_import(struct obd_device *obd)
1266 struct obd_import *imp;
1267 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1269 OBD_ALLOC(imp, sizeof(*imp));
1273 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1274 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1275 INIT_LIST_HEAD(&imp->imp_replay_list);
1276 INIT_LIST_HEAD(&imp->imp_sending_list);
1277 INIT_LIST_HEAD(&imp->imp_delayed_list);
1278 INIT_LIST_HEAD(&imp->imp_committed_list);
1279 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1280 imp->imp_known_replied_xid = 0;
1281 imp->imp_replay_cursor = &imp->imp_committed_list;
1282 spin_lock_init(&imp->imp_lock);
1283 imp->imp_last_success_conn = 0;
1284 imp->imp_state = LUSTRE_IMP_NEW;
1285 imp->imp_obd = class_incref(obd, "import", imp);
1286 mutex_init(&imp->imp_sec_mutex);
1287 init_waitqueue_head(&imp->imp_recovery_waitq);
1289 if (curr_pid_ns->child_reaper)
1290 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1292 imp->imp_sec_refpid = 1;
1294 atomic_set(&imp->imp_refcount, 2);
1295 atomic_set(&imp->imp_unregistering, 0);
1296 atomic_set(&imp->imp_inflight, 0);
1297 atomic_set(&imp->imp_replay_inflight, 0);
1298 atomic_set(&imp->imp_inval_count, 0);
1299 INIT_LIST_HEAD(&imp->imp_conn_list);
1300 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1301 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1302 init_imp_at(&imp->imp_at);
1304 /* the default magic is V2, will be used in connect RPC, and
1305 * then adjusted according to the flags in request/reply. */
1306 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1310 EXPORT_SYMBOL(class_new_import);
1312 void class_destroy_import(struct obd_import *import)
1314 LASSERT(import != NULL);
1315 LASSERT(import != LP_POISON);
1317 class_handle_unhash(&import->imp_handle);
1319 spin_lock(&import->imp_lock);
1320 import->imp_generation++;
1321 spin_unlock(&import->imp_lock);
1322 class_import_put(import);
1324 EXPORT_SYMBOL(class_destroy_import);
1326 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1328 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1330 spin_lock(&exp->exp_locks_list_guard);
1332 LASSERT(lock->l_exp_refs_nr >= 0);
1334 if (lock->l_exp_refs_target != NULL &&
1335 lock->l_exp_refs_target != exp) {
1336 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1337 exp, lock, lock->l_exp_refs_target);
1339 if ((lock->l_exp_refs_nr ++) == 0) {
1340 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1341 lock->l_exp_refs_target = exp;
1343 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1344 lock, exp, lock->l_exp_refs_nr);
1345 spin_unlock(&exp->exp_locks_list_guard);
1347 EXPORT_SYMBOL(__class_export_add_lock_ref);
1349 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1351 spin_lock(&exp->exp_locks_list_guard);
1352 LASSERT(lock->l_exp_refs_nr > 0);
1353 if (lock->l_exp_refs_target != exp) {
1354 LCONSOLE_WARN("lock %p, "
1355 "mismatching export pointers: %p, %p\n",
1356 lock, lock->l_exp_refs_target, exp);
1358 if (-- lock->l_exp_refs_nr == 0) {
1359 list_del_init(&lock->l_exp_refs_link);
1360 lock->l_exp_refs_target = NULL;
1362 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1363 lock, exp, lock->l_exp_refs_nr);
1364 spin_unlock(&exp->exp_locks_list_guard);
1366 EXPORT_SYMBOL(__class_export_del_lock_ref);
1369 /* A connection defines an export context in which preallocation can
1370 be managed. This releases the export pointer reference, and returns
1371 the export handle, so the export refcount is 1 when this function
1373 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1374 struct obd_uuid *cluuid)
1376 struct obd_export *export;
1377 LASSERT(conn != NULL);
1378 LASSERT(obd != NULL);
1379 LASSERT(cluuid != NULL);
1382 export = class_new_export(obd, cluuid);
1384 RETURN(PTR_ERR(export));
1386 conn->cookie = export->exp_handle.h_cookie;
1387 class_export_put(export);
1389 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1390 cluuid->uuid, conn->cookie);
1393 EXPORT_SYMBOL(class_connect);
1395 /* if export is involved in recovery then clean up related things */
1396 static void class_export_recovery_cleanup(struct obd_export *exp)
1398 struct obd_device *obd = exp->exp_obd;
1400 spin_lock(&obd->obd_recovery_task_lock);
1401 if (obd->obd_recovering) {
1402 if (exp->exp_in_recovery) {
1403 spin_lock(&exp->exp_lock);
1404 exp->exp_in_recovery = 0;
1405 spin_unlock(&exp->exp_lock);
1406 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1407 atomic_dec(&obd->obd_connected_clients);
1410 /* if called during recovery then should update
1411 * obd_stale_clients counter,
1412 * lightweight exports are not counted */
1413 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1414 exp->exp_obd->obd_stale_clients++;
1416 spin_unlock(&obd->obd_recovery_task_lock);
1418 spin_lock(&exp->exp_lock);
1419 /** Cleanup req replay fields */
1420 if (exp->exp_req_replay_needed) {
1421 exp->exp_req_replay_needed = 0;
1423 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1424 atomic_dec(&obd->obd_req_replay_clients);
1427 /** Cleanup lock replay data */
1428 if (exp->exp_lock_replay_needed) {
1429 exp->exp_lock_replay_needed = 0;
1431 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1432 atomic_dec(&obd->obd_lock_replay_clients);
1434 spin_unlock(&exp->exp_lock);
1437 /* This function removes 1-3 references from the export:
1438 * 1 - for export pointer passed
1439 * and if disconnect really need
1440 * 2 - removing from hash
1441 * 3 - in client_unlink_export
1442 * The export pointer passed to this function can destroyed */
1443 int class_disconnect(struct obd_export *export)
1445 int already_disconnected;
1448 if (export == NULL) {
1449 CWARN("attempting to free NULL export %p\n", export);
1453 spin_lock(&export->exp_lock);
1454 already_disconnected = export->exp_disconnected;
1455 export->exp_disconnected = 1;
1456 /* We hold references of export for uuid hash
1457 * and nid_hash and export link at least. So
1458 * it is safe to call cfs_hash_del in there. */
1459 if (!hlist_unhashed(&export->exp_nid_hash))
1460 cfs_hash_del(export->exp_obd->obd_nid_hash,
1461 &export->exp_connection->c_peer.nid,
1462 &export->exp_nid_hash);
1463 spin_unlock(&export->exp_lock);
1465 /* class_cleanup(), abort_recovery(), and class_fail_export()
1466 * all end up in here, and if any of them race we shouldn't
1467 * call extra class_export_puts(). */
1468 if (already_disconnected) {
1469 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1470 GOTO(no_disconn, already_disconnected);
1473 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1474 export->exp_handle.h_cookie);
1476 class_export_recovery_cleanup(export);
1477 class_unlink_export(export);
1479 class_export_put(export);
1482 EXPORT_SYMBOL(class_disconnect);
1484 /* Return non-zero for a fully connected export */
1485 int class_connected_export(struct obd_export *exp)
1490 spin_lock(&exp->exp_lock);
1491 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1492 spin_unlock(&exp->exp_lock);
1496 EXPORT_SYMBOL(class_connected_export);
1498 static void class_disconnect_export_list(struct list_head *list,
1499 enum obd_option flags)
1502 struct obd_export *exp;
1505 /* It's possible that an export may disconnect itself, but
1506 * nothing else will be added to this list. */
1507 while (!list_empty(list)) {
1508 exp = list_entry(list->next, struct obd_export,
1510 /* need for safe call CDEBUG after obd_disconnect */
1511 class_export_get(exp);
1513 spin_lock(&exp->exp_lock);
1514 exp->exp_flags = flags;
1515 spin_unlock(&exp->exp_lock);
1517 if (obd_uuid_equals(&exp->exp_client_uuid,
1518 &exp->exp_obd->obd_uuid)) {
1520 "exp %p export uuid == obd uuid, don't discon\n",
1522 /* Need to delete this now so we don't end up pointing
1523 * to work_list later when this export is cleaned up. */
1524 list_del_init(&exp->exp_obd_chain);
1525 class_export_put(exp);
1529 class_export_get(exp);
1530 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1531 "last request at %lld\n",
1532 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1533 exp, exp->exp_last_request_time);
1534 /* release one export reference anyway */
1535 rc = obd_disconnect(exp);
1537 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1538 obd_export_nid2str(exp), exp, rc);
1539 class_export_put(exp);
1544 void class_disconnect_exports(struct obd_device *obd)
1546 struct list_head work_list;
1549 /* Move all of the exports from obd_exports to a work list, en masse. */
1550 INIT_LIST_HEAD(&work_list);
1551 spin_lock(&obd->obd_dev_lock);
1552 list_splice_init(&obd->obd_exports, &work_list);
1553 list_splice_init(&obd->obd_delayed_exports, &work_list);
1554 spin_unlock(&obd->obd_dev_lock);
1556 if (!list_empty(&work_list)) {
1557 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1558 "disconnecting them\n", obd->obd_minor, obd);
1559 class_disconnect_export_list(&work_list,
1560 exp_flags_from_obd(obd));
1562 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1563 obd->obd_minor, obd);
1566 EXPORT_SYMBOL(class_disconnect_exports);
1568 /* Remove exports that have not completed recovery.
1570 void class_disconnect_stale_exports(struct obd_device *obd,
1571 int (*test_export)(struct obd_export *))
1573 struct list_head work_list;
1574 struct obd_export *exp, *n;
1578 INIT_LIST_HEAD(&work_list);
1579 spin_lock(&obd->obd_dev_lock);
1580 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1582 /* don't count self-export as client */
1583 if (obd_uuid_equals(&exp->exp_client_uuid,
1584 &exp->exp_obd->obd_uuid))
1587 /* don't evict clients which have no slot in last_rcvd
1588 * (e.g. lightweight connection) */
1589 if (exp->exp_target_data.ted_lr_idx == -1)
1592 spin_lock(&exp->exp_lock);
1593 if (exp->exp_failed || test_export(exp)) {
1594 spin_unlock(&exp->exp_lock);
1597 exp->exp_failed = 1;
1598 spin_unlock(&exp->exp_lock);
1600 list_move(&exp->exp_obd_chain, &work_list);
1602 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1603 obd->obd_name, exp->exp_client_uuid.uuid,
1604 exp->exp_connection == NULL ? "<unknown>" :
1605 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1606 print_export_data(exp, "EVICTING", 0, D_HA);
1608 spin_unlock(&obd->obd_dev_lock);
1611 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1612 obd->obd_name, evicted);
1614 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1615 OBD_OPT_ABORT_RECOV);
1618 EXPORT_SYMBOL(class_disconnect_stale_exports);
1620 void class_fail_export(struct obd_export *exp)
1622 int rc, already_failed;
1624 spin_lock(&exp->exp_lock);
1625 already_failed = exp->exp_failed;
1626 exp->exp_failed = 1;
1627 spin_unlock(&exp->exp_lock);
1629 if (already_failed) {
1630 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1631 exp, exp->exp_client_uuid.uuid);
1635 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1636 exp, exp->exp_client_uuid.uuid);
1638 if (obd_dump_on_timeout)
1639 libcfs_debug_dumplog();
1641 /* need for safe call CDEBUG after obd_disconnect */
1642 class_export_get(exp);
1644 /* Most callers into obd_disconnect are removing their own reference
1645 * (request, for example) in addition to the one from the hash table.
1646 * We don't have such a reference here, so make one. */
1647 class_export_get(exp);
1648 rc = obd_disconnect(exp);
1650 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1652 CDEBUG(D_HA, "disconnected export %p/%s\n",
1653 exp, exp->exp_client_uuid.uuid);
1654 class_export_put(exp);
1656 EXPORT_SYMBOL(class_fail_export);
1658 char *obd_export_nid2str(struct obd_export *exp)
1660 if (exp->exp_connection != NULL)
1661 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1665 EXPORT_SYMBOL(obd_export_nid2str);
1667 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1669 struct cfs_hash *nid_hash;
1670 struct obd_export *doomed_exp = NULL;
1671 int exports_evicted = 0;
1673 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1675 spin_lock(&obd->obd_dev_lock);
1676 /* umount has run already, so evict thread should leave
1677 * its task to umount thread now */
1678 if (obd->obd_stopping) {
1679 spin_unlock(&obd->obd_dev_lock);
1680 return exports_evicted;
1682 nid_hash = obd->obd_nid_hash;
1683 cfs_hash_getref(nid_hash);
1684 spin_unlock(&obd->obd_dev_lock);
1687 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1688 if (doomed_exp == NULL)
1691 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1692 "nid %s found, wanted nid %s, requested nid %s\n",
1693 obd_export_nid2str(doomed_exp),
1694 libcfs_nid2str(nid_key), nid);
1695 LASSERTF(doomed_exp != obd->obd_self_export,
1696 "self-export is hashed by NID?\n");
1698 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1699 "request\n", obd->obd_name,
1700 obd_uuid2str(&doomed_exp->exp_client_uuid),
1701 obd_export_nid2str(doomed_exp));
1702 class_fail_export(doomed_exp);
1703 class_export_put(doomed_exp);
1706 cfs_hash_putref(nid_hash);
1708 if (!exports_evicted)
1709 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1710 obd->obd_name, nid);
1711 return exports_evicted;
1713 EXPORT_SYMBOL(obd_export_evict_by_nid);
1715 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1717 struct cfs_hash *uuid_hash;
1718 struct obd_export *doomed_exp = NULL;
1719 struct obd_uuid doomed_uuid;
1720 int exports_evicted = 0;
1722 spin_lock(&obd->obd_dev_lock);
1723 if (obd->obd_stopping) {
1724 spin_unlock(&obd->obd_dev_lock);
1725 return exports_evicted;
1727 uuid_hash = obd->obd_uuid_hash;
1728 cfs_hash_getref(uuid_hash);
1729 spin_unlock(&obd->obd_dev_lock);
1731 obd_str2uuid(&doomed_uuid, uuid);
1732 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1733 CERROR("%s: can't evict myself\n", obd->obd_name);
1734 cfs_hash_putref(uuid_hash);
1735 return exports_evicted;
1738 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1740 if (doomed_exp == NULL) {
1741 CERROR("%s: can't disconnect %s: no exports found\n",
1742 obd->obd_name, uuid);
1744 CWARN("%s: evicting %s at adminstrative request\n",
1745 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1746 class_fail_export(doomed_exp);
1747 class_export_put(doomed_exp);
1750 cfs_hash_putref(uuid_hash);
1752 return exports_evicted;
1755 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1756 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1757 EXPORT_SYMBOL(class_export_dump_hook);
1760 static void print_export_data(struct obd_export *exp, const char *status,
1761 int locks, int debug_level)
1763 struct ptlrpc_reply_state *rs;
1764 struct ptlrpc_reply_state *first_reply = NULL;
1767 spin_lock(&exp->exp_lock);
1768 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1774 spin_unlock(&exp->exp_lock);
1776 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1777 "%p %s %llu stale:%d\n",
1778 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1779 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1780 atomic_read(&exp->exp_rpc_count),
1781 atomic_read(&exp->exp_cb_count),
1782 atomic_read(&exp->exp_locks_count),
1783 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1784 nreplies, first_reply, nreplies > 3 ? "..." : "",
1785 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1786 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1787 if (locks && class_export_dump_hook != NULL)
1788 class_export_dump_hook(exp);
1792 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1794 struct obd_export *exp;
1796 spin_lock(&obd->obd_dev_lock);
1797 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1798 print_export_data(exp, "ACTIVE", locks, debug_level);
1799 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1800 print_export_data(exp, "UNLINKED", locks, debug_level);
1801 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1802 print_export_data(exp, "DELAYED", locks, debug_level);
1803 spin_unlock(&obd->obd_dev_lock);
1804 spin_lock(&obd_zombie_impexp_lock);
1805 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1806 print_export_data(exp, "ZOMBIE", locks, debug_level);
1807 spin_unlock(&obd_zombie_impexp_lock);
1810 void obd_exports_barrier(struct obd_device *obd)
1813 LASSERT(list_empty(&obd->obd_exports));
1814 spin_lock(&obd->obd_dev_lock);
1815 while (!list_empty(&obd->obd_unlinked_exports)) {
1816 spin_unlock(&obd->obd_dev_lock);
1817 set_current_state(TASK_UNINTERRUPTIBLE);
1818 schedule_timeout(cfs_time_seconds(waited));
1819 if (waited > 5 && is_power_of_2(waited)) {
1820 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1821 "more than %d seconds. "
1822 "The obd refcount = %d. Is it stuck?\n",
1823 obd->obd_name, waited,
1824 atomic_read(&obd->obd_refcount));
1825 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1828 spin_lock(&obd->obd_dev_lock);
1830 spin_unlock(&obd->obd_dev_lock);
1832 EXPORT_SYMBOL(obd_exports_barrier);
1834 /* Total amount of zombies to be destroyed */
1835 static int zombies_count = 0;
1838 * kill zombie imports and exports
1840 void obd_zombie_impexp_cull(void)
1842 struct obd_import *import;
1843 struct obd_export *export;
1847 spin_lock(&obd_zombie_impexp_lock);
1850 if (!list_empty(&obd_zombie_imports)) {
1851 import = list_entry(obd_zombie_imports.next,
1854 list_del_init(&import->imp_zombie_chain);
1858 if (!list_empty(&obd_zombie_exports)) {
1859 export = list_entry(obd_zombie_exports.next,
1862 list_del_init(&export->exp_obd_chain);
1865 spin_unlock(&obd_zombie_impexp_lock);
1867 if (import != NULL) {
1868 class_import_destroy(import);
1869 spin_lock(&obd_zombie_impexp_lock);
1871 spin_unlock(&obd_zombie_impexp_lock);
1874 if (export != NULL) {
1875 class_export_destroy(export);
1876 spin_lock(&obd_zombie_impexp_lock);
1878 spin_unlock(&obd_zombie_impexp_lock);
1882 } while (import != NULL || export != NULL);
1886 static DECLARE_COMPLETION(obd_zombie_start);
1887 static DECLARE_COMPLETION(obd_zombie_stop);
1888 static unsigned long obd_zombie_flags;
1889 static DECLARE_WAIT_QUEUE_HEAD(obd_zombie_waitq);
1890 static pid_t obd_zombie_pid;
1893 OBD_ZOMBIE_STOP = 0x0001,
1897 * check for work for kill zombie import/export thread.
1899 static int obd_zombie_impexp_check(void *arg)
1903 spin_lock(&obd_zombie_impexp_lock);
1904 rc = (zombies_count == 0) &&
1905 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1906 spin_unlock(&obd_zombie_impexp_lock);
1912 * Add export to the obd_zombe thread and notify it.
1914 static void obd_zombie_export_add(struct obd_export *exp) {
1915 atomic_dec(&obd_stale_export_num);
1916 spin_lock(&exp->exp_obd->obd_dev_lock);
1917 LASSERT(!list_empty(&exp->exp_obd_chain));
1918 list_del_init(&exp->exp_obd_chain);
1919 spin_unlock(&exp->exp_obd->obd_dev_lock);
1920 spin_lock(&obd_zombie_impexp_lock);
1922 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1923 spin_unlock(&obd_zombie_impexp_lock);
1925 obd_zombie_impexp_notify();
1929 * Add import to the obd_zombe thread and notify it.
1931 static void obd_zombie_import_add(struct obd_import *imp) {
1932 LASSERT(imp->imp_sec == NULL);
1933 spin_lock(&obd_zombie_impexp_lock);
1934 LASSERT(list_empty(&imp->imp_zombie_chain));
1936 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1937 spin_unlock(&obd_zombie_impexp_lock);
1939 obd_zombie_impexp_notify();
1943 * notify import/export destroy thread about new zombie.
1945 static void obd_zombie_impexp_notify(void)
1948 * Make sure obd_zomebie_impexp_thread get this notification.
1949 * It is possible this signal only get by obd_zombie_barrier, and
1950 * barrier gulps this notification and sleeps away and hangs ensues
1952 wake_up_all(&obd_zombie_waitq);
1956 * check whether obd_zombie is idle
1958 static int obd_zombie_is_idle(void)
1962 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1963 spin_lock(&obd_zombie_impexp_lock);
1964 rc = (zombies_count == 0);
1965 spin_unlock(&obd_zombie_impexp_lock);
1970 * wait when obd_zombie import/export queues become empty
1972 void obd_zombie_barrier(void)
1974 struct l_wait_info lwi = { 0 };
1976 if (obd_zombie_pid == current_pid())
1977 /* don't wait for myself */
1979 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1981 EXPORT_SYMBOL(obd_zombie_barrier);
1984 struct obd_export *obd_stale_export_get(void)
1986 struct obd_export *exp = NULL;
1989 spin_lock(&obd_stale_export_lock);
1990 if (!list_empty(&obd_stale_exports)) {
1991 exp = list_entry(obd_stale_exports.next,
1992 struct obd_export, exp_stale_list);
1993 list_del_init(&exp->exp_stale_list);
1995 spin_unlock(&obd_stale_export_lock);
1998 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1999 atomic_read(&obd_stale_export_num));
2003 EXPORT_SYMBOL(obd_stale_export_get);
2005 void obd_stale_export_put(struct obd_export *exp)
2009 LASSERT(list_empty(&exp->exp_stale_list));
2010 if (exp->exp_lock_hash &&
2011 atomic_read(&exp->exp_lock_hash->hs_count)) {
2012 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
2013 atomic_read(&obd_stale_export_num));
2015 spin_lock_bh(&exp->exp_bl_list_lock);
2016 spin_lock(&obd_stale_export_lock);
2017 /* Add to the tail if there is no blocked locks,
2018 * to the head otherwise. */
2019 if (list_empty(&exp->exp_bl_list))
2020 list_add_tail(&exp->exp_stale_list,
2021 &obd_stale_exports);
2023 list_add(&exp->exp_stale_list,
2024 &obd_stale_exports);
2026 spin_unlock(&obd_stale_export_lock);
2027 spin_unlock_bh(&exp->exp_bl_list_lock);
2029 class_export_put(exp);
2033 EXPORT_SYMBOL(obd_stale_export_put);
2036 * Adjust the position of the export in the stale list,
2037 * i.e. move to the head of the list if is needed.
2039 void obd_stale_export_adjust(struct obd_export *exp)
2041 LASSERT(exp != NULL);
2042 spin_lock_bh(&exp->exp_bl_list_lock);
2043 spin_lock(&obd_stale_export_lock);
2045 if (!list_empty(&exp->exp_stale_list) &&
2046 !list_empty(&exp->exp_bl_list))
2047 list_move(&exp->exp_stale_list, &obd_stale_exports);
2049 spin_unlock(&obd_stale_export_lock);
2050 spin_unlock_bh(&exp->exp_bl_list_lock);
2052 EXPORT_SYMBOL(obd_stale_export_adjust);
2055 * destroy zombie export/import thread.
2057 static int obd_zombie_impexp_thread(void *unused)
2059 unshare_fs_struct();
2060 complete(&obd_zombie_start);
2062 obd_zombie_pid = current_pid();
2064 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
2065 struct l_wait_info lwi = { 0 };
2067 l_wait_event(obd_zombie_waitq,
2068 !obd_zombie_impexp_check(NULL), &lwi);
2069 obd_zombie_impexp_cull();
2072 * Notify obd_zombie_barrier callers that queues
2075 wake_up(&obd_zombie_waitq);
2078 complete(&obd_zombie_stop);
2085 * start destroy zombie import/export thread
2087 int obd_zombie_impexp_init(void)
2089 struct task_struct *task;
2091 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
2093 RETURN(PTR_ERR(task));
2095 wait_for_completion(&obd_zombie_start);
2099 * stop destroy zombie import/export thread
2101 void obd_zombie_impexp_stop(void)
2103 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
2104 obd_zombie_impexp_notify();
2105 wait_for_completion(&obd_zombie_stop);
2106 LASSERT(list_empty(&obd_stale_exports));
2109 /***** Kernel-userspace comm helpers *******/
2111 /* Get length of entire message, including header */
2112 int kuc_len(int payload_len)
2114 return sizeof(struct kuc_hdr) + payload_len;
2116 EXPORT_SYMBOL(kuc_len);
2118 /* Get a pointer to kuc header, given a ptr to the payload
2119 * @param p Pointer to payload area
2120 * @returns Pointer to kuc header
2122 struct kuc_hdr * kuc_ptr(void *p)
2124 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2125 LASSERT(lh->kuc_magic == KUC_MAGIC);
2128 EXPORT_SYMBOL(kuc_ptr);
2130 /* Alloc space for a message, and fill in header
2131 * @return Pointer to payload area
2133 void *kuc_alloc(int payload_len, int transport, int type)
2136 int len = kuc_len(payload_len);
2140 return ERR_PTR(-ENOMEM);
2142 lh->kuc_magic = KUC_MAGIC;
2143 lh->kuc_transport = transport;
2144 lh->kuc_msgtype = type;
2145 lh->kuc_msglen = len;
2147 return (void *)(lh + 1);
2149 EXPORT_SYMBOL(kuc_alloc);
2151 /* Takes pointer to payload area */
2152 void kuc_free(void *p, int payload_len)
2154 struct kuc_hdr *lh = kuc_ptr(p);
2155 OBD_FREE(lh, kuc_len(payload_len));
2157 EXPORT_SYMBOL(kuc_free);
2159 struct obd_request_slot_waiter {
2160 struct list_head orsw_entry;
2161 wait_queue_head_t orsw_waitq;
2165 static bool obd_request_slot_avail(struct client_obd *cli,
2166 struct obd_request_slot_waiter *orsw)
2170 spin_lock(&cli->cl_loi_list_lock);
2171 avail = !!list_empty(&orsw->orsw_entry);
2172 spin_unlock(&cli->cl_loi_list_lock);
2178 * For network flow control, the RPC sponsor needs to acquire a credit
2179 * before sending the RPC. The credits count for a connection is defined
2180 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2181 * the subsequent RPC sponsors need to wait until others released their
2182 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2184 int obd_get_request_slot(struct client_obd *cli)
2186 struct obd_request_slot_waiter orsw;
2187 struct l_wait_info lwi;
2190 spin_lock(&cli->cl_loi_list_lock);
2191 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
2192 cli->cl_r_in_flight++;
2193 spin_unlock(&cli->cl_loi_list_lock);
2197 init_waitqueue_head(&orsw.orsw_waitq);
2198 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
2199 orsw.orsw_signaled = false;
2200 spin_unlock(&cli->cl_loi_list_lock);
2202 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2203 rc = l_wait_event(orsw.orsw_waitq,
2204 obd_request_slot_avail(cli, &orsw) ||
2208 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2209 * freed but other (such as obd_put_request_slot) is using it. */
2210 spin_lock(&cli->cl_loi_list_lock);
2212 if (!orsw.orsw_signaled) {
2213 if (list_empty(&orsw.orsw_entry))
2214 cli->cl_r_in_flight--;
2216 list_del(&orsw.orsw_entry);
2220 if (orsw.orsw_signaled) {
2221 LASSERT(list_empty(&orsw.orsw_entry));
2225 spin_unlock(&cli->cl_loi_list_lock);
2229 EXPORT_SYMBOL(obd_get_request_slot);
2231 void obd_put_request_slot(struct client_obd *cli)
2233 struct obd_request_slot_waiter *orsw;
2235 spin_lock(&cli->cl_loi_list_lock);
2236 cli->cl_r_in_flight--;
2238 /* If there is free slot, wakeup the first waiter. */
2239 if (!list_empty(&cli->cl_loi_read_list) &&
2240 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2241 orsw = list_entry(cli->cl_loi_read_list.next,
2242 struct obd_request_slot_waiter, orsw_entry);
2243 list_del_init(&orsw->orsw_entry);
2244 cli->cl_r_in_flight++;
2245 wake_up(&orsw->orsw_waitq);
2247 spin_unlock(&cli->cl_loi_list_lock);
2249 EXPORT_SYMBOL(obd_put_request_slot);
2251 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2253 return cli->cl_max_rpcs_in_flight;
2255 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2257 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2259 struct obd_request_slot_waiter *orsw;
2266 if (max > OBD_MAX_RIF_MAX || max < 1)
2269 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2270 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2271 /* adjust max_mod_rpcs_in_flight to ensure it is always
2272 * strictly lower that max_rpcs_in_flight */
2274 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2275 "because it must be higher than "
2276 "max_mod_rpcs_in_flight value",
2277 cli->cl_import->imp_obd->obd_name);
2280 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2281 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2287 spin_lock(&cli->cl_loi_list_lock);
2288 old = cli->cl_max_rpcs_in_flight;
2289 cli->cl_max_rpcs_in_flight = max;
2292 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2293 for (i = 0; i < diff; i++) {
2294 if (list_empty(&cli->cl_loi_read_list))
2297 orsw = list_entry(cli->cl_loi_read_list.next,
2298 struct obd_request_slot_waiter, orsw_entry);
2299 list_del_init(&orsw->orsw_entry);
2300 cli->cl_r_in_flight++;
2301 wake_up(&orsw->orsw_waitq);
2303 spin_unlock(&cli->cl_loi_list_lock);
2307 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2309 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2311 return cli->cl_max_mod_rpcs_in_flight;
2313 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2315 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2317 struct obd_connect_data *ocd;
2321 if (max > OBD_MAX_RIF_MAX || max < 1)
2324 /* cannot exceed or equal max_rpcs_in_flight */
2325 if (max >= cli->cl_max_rpcs_in_flight) {
2326 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2327 "higher or equal to max_rpcs_in_flight value (%u)\n",
2328 cli->cl_import->imp_obd->obd_name,
2329 max, cli->cl_max_rpcs_in_flight);
2333 /* cannot exceed max modify RPCs in flight supported by the server */
2334 ocd = &cli->cl_import->imp_connect_data;
2335 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2336 maxmodrpcs = ocd->ocd_maxmodrpcs;
2339 if (max > maxmodrpcs) {
2340 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2341 "higher than max_mod_rpcs_per_client value (%hu) "
2342 "returned by the server at connection\n",
2343 cli->cl_import->imp_obd->obd_name,
2348 spin_lock(&cli->cl_mod_rpcs_lock);
2350 prev = cli->cl_max_mod_rpcs_in_flight;
2351 cli->cl_max_mod_rpcs_in_flight = max;
2353 /* wakeup waiters if limit has been increased */
2354 if (cli->cl_max_mod_rpcs_in_flight > prev)
2355 wake_up(&cli->cl_mod_rpcs_waitq);
2357 spin_unlock(&cli->cl_mod_rpcs_lock);
2361 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2364 #define pct(a, b) (b ? a * 100 / b : 0)
2365 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2366 struct seq_file *seq)
2368 unsigned long mod_tot = 0, mod_cum;
2369 struct timespec64 now;
2372 ktime_get_real_ts64(&now);
2374 spin_lock(&cli->cl_mod_rpcs_lock);
2376 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2377 (s64)now.tv_sec, now.tv_nsec);
2378 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2379 cli->cl_mod_rpcs_in_flight);
2381 seq_printf(seq, "\n\t\t\tmodify\n");
2382 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2384 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2387 for (i = 0; i < OBD_HIST_MAX; i++) {
2388 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2390 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2391 i, mod, pct(mod, mod_tot),
2392 pct(mod_cum, mod_tot));
2393 if (mod_cum == mod_tot)
2397 spin_unlock(&cli->cl_mod_rpcs_lock);
2401 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2405 /* The number of modify RPCs sent in parallel is limited
2406 * because the server has a finite number of slots per client to
2407 * store request result and ensure reply reconstruction when needed.
2408 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2409 * that takes into account server limit and cl_max_rpcs_in_flight
2411 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2412 * one close request is allowed above the maximum.
2414 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2419 /* A slot is available if
2420 * - number of modify RPCs in flight is less than the max
2421 * - it's a close RPC and no other close request is in flight
2423 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2424 (close_req && cli->cl_close_rpcs_in_flight == 0);
2429 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2434 spin_lock(&cli->cl_mod_rpcs_lock);
2435 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2436 spin_unlock(&cli->cl_mod_rpcs_lock);
2440 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2443 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2444 it->it_op == IT_READDIR ||
2445 (it->it_op == IT_LAYOUT && !(it->it_flags & FMODE_WRITE))))
2450 /* Get a modify RPC slot from the obd client @cli according
2451 * to the kind of operation @opc that is going to be sent
2452 * and the intent @it of the operation if it applies.
2453 * If the maximum number of modify RPCs in flight is reached
2454 * the thread is put to sleep.
2455 * Returns the tag to be set in the request message. Tag 0
2456 * is reserved for non-modifying requests.
2458 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2459 struct lookup_intent *it)
2461 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2462 bool close_req = false;
2465 /* read-only metadata RPCs don't consume a slot on MDT
2466 * for reply reconstruction
2468 if (obd_skip_mod_rpc_slot(it))
2471 if (opc == MDS_CLOSE)
2475 spin_lock(&cli->cl_mod_rpcs_lock);
2476 max = cli->cl_max_mod_rpcs_in_flight;
2477 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2478 /* there is a slot available */
2479 cli->cl_mod_rpcs_in_flight++;
2481 cli->cl_close_rpcs_in_flight++;
2482 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2483 cli->cl_mod_rpcs_in_flight);
2484 /* find a free tag */
2485 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2487 LASSERT(i < OBD_MAX_RIF_MAX);
2488 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2489 spin_unlock(&cli->cl_mod_rpcs_lock);
2490 /* tag 0 is reserved for non-modify RPCs */
2493 spin_unlock(&cli->cl_mod_rpcs_lock);
2495 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2496 "opc %u, max %hu\n",
2497 cli->cl_import->imp_obd->obd_name, opc, max);
2499 l_wait_event(cli->cl_mod_rpcs_waitq,
2500 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2503 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2505 /* Put a modify RPC slot from the obd client @cli according
2506 * to the kind of operation @opc that has been sent and the
2507 * intent @it of the operation if it applies.
2509 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2510 struct lookup_intent *it, __u16 tag)
2512 bool close_req = false;
2514 if (obd_skip_mod_rpc_slot(it))
2517 if (opc == MDS_CLOSE)
2520 spin_lock(&cli->cl_mod_rpcs_lock);
2521 cli->cl_mod_rpcs_in_flight--;
2523 cli->cl_close_rpcs_in_flight--;
2524 /* release the tag in the bitmap */
2525 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2526 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2527 spin_unlock(&cli->cl_mod_rpcs_lock);
2528 wake_up(&cli->cl_mod_rpcs_waitq);
2530 EXPORT_SYMBOL(obd_put_mod_rpc_slot);