4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
48 static DEFINE_SPINLOCK(obd_types_lock);
49 static LIST_HEAD(obd_types);
50 DEFINE_RWLOCK(obd_dev_lock);
51 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
53 static struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 static struct kmem_cache *import_cachep;
58 static LIST_HEAD(obd_zombie_imports);
59 static LIST_HEAD(obd_zombie_exports);
60 static DEFINE_SPINLOCK(obd_zombie_impexp_lock);
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp,
66 const char *status, int locks, int debug_level);
68 static LIST_HEAD(obd_stale_exports);
69 static DEFINE_SPINLOCK(obd_stale_export_lock);
70 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
72 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
73 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
76 * support functions: we could use inter-module communication, but this
77 * is more portable to other OS's
79 static struct obd_device *obd_device_alloc(void)
81 struct obd_device *obd;
83 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
85 obd->obd_magic = OBD_DEVICE_MAGIC;
90 static void obd_device_free(struct obd_device *obd)
93 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
94 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
95 if (obd->obd_namespace != NULL) {
96 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
97 obd, obd->obd_namespace, obd->obd_force);
100 lu_ref_fini(&obd->obd_reference);
101 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
104 struct obd_type *class_search_type(const char *name)
106 struct list_head *tmp;
107 struct obd_type *type;
109 spin_lock(&obd_types_lock);
110 list_for_each(tmp, &obd_types) {
111 type = list_entry(tmp, struct obd_type, typ_chain);
112 if (strcmp(type->typ_name, name) == 0) {
113 spin_unlock(&obd_types_lock);
117 spin_unlock(&obd_types_lock);
120 EXPORT_SYMBOL(class_search_type);
122 struct obd_type *class_get_type(const char *name)
124 struct obd_type *type = class_search_type(name);
126 #ifdef HAVE_MODULE_LOADING_SUPPORT
128 const char *modname = name;
130 if (strcmp(modname, "obdfilter") == 0)
133 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
134 modname = LUSTRE_OSP_NAME;
136 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
137 modname = LUSTRE_MDT_NAME;
139 if (!request_module("%s", modname)) {
140 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
141 type = class_search_type(name);
143 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
149 spin_lock(&type->obd_type_lock);
151 try_module_get(type->typ_dt_ops->o_owner);
152 spin_unlock(&type->obd_type_lock);
157 void class_put_type(struct obd_type *type)
160 spin_lock(&type->obd_type_lock);
162 module_put(type->typ_dt_ops->o_owner);
163 spin_unlock(&type->obd_type_lock);
166 static void class_sysfs_release(struct kobject *kobj)
168 struct obd_type *type = container_of(kobj, struct obd_type,
171 complete(&type->typ_kobj_unregister);
174 static struct kobj_type class_ktype = {
175 .sysfs_ops = &lustre_sysfs_ops,
176 .release = class_sysfs_release,
179 #define CLASS_MAX_NAME 1024
181 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
182 bool enable_proc, struct lprocfs_vars *vars,
183 const char *name, struct lu_device_type *ldt)
185 struct obd_type *type;
190 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
192 if (class_search_type(name)) {
193 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
198 OBD_ALLOC(type, sizeof(*type));
202 OBD_ALLOC_PTR(type->typ_dt_ops);
203 OBD_ALLOC_PTR(type->typ_md_ops);
204 OBD_ALLOC(type->typ_name, strlen(name) + 1);
206 if (type->typ_dt_ops == NULL ||
207 type->typ_md_ops == NULL ||
208 type->typ_name == NULL)
211 *(type->typ_dt_ops) = *dt_ops;
212 /* md_ops is optional */
214 *(type->typ_md_ops) = *md_ops;
215 strcpy(type->typ_name, name);
216 spin_lock_init(&type->obd_type_lock);
218 #ifdef CONFIG_PROC_FS
220 type->typ_procroot = lprocfs_register(type->typ_name,
223 if (IS_ERR(type->typ_procroot)) {
224 rc = PTR_ERR(type->typ_procroot);
225 type->typ_procroot = NULL;
230 type->typ_kobj.kset = lustre_kset;
231 init_completion(&type->typ_kobj_unregister);
232 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
233 &lustre_kset->kobj, "%s", type->typ_name);
239 rc = lu_device_type_init(ldt);
241 kobject_put(&type->typ_kobj);
246 spin_lock(&obd_types_lock);
247 list_add(&type->typ_chain, &obd_types);
248 spin_unlock(&obd_types_lock);
253 if (type->typ_name != NULL) {
254 #ifdef CONFIG_PROC_FS
255 if (type->typ_procroot != NULL)
256 remove_proc_subtree(type->typ_name, proc_lustre_root);
258 OBD_FREE(type->typ_name, strlen(name) + 1);
260 if (type->typ_md_ops != NULL)
261 OBD_FREE_PTR(type->typ_md_ops);
262 if (type->typ_dt_ops != NULL)
263 OBD_FREE_PTR(type->typ_dt_ops);
264 OBD_FREE(type, sizeof(*type));
267 EXPORT_SYMBOL(class_register_type);
269 int class_unregister_type(const char *name)
271 struct obd_type *type = class_search_type(name);
275 CERROR("unknown obd type\n");
279 if (type->typ_refcnt) {
280 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
281 /* This is a bad situation, let's make the best of it */
282 /* Remove ops, but leave the name for debugging */
283 OBD_FREE_PTR(type->typ_dt_ops);
284 OBD_FREE_PTR(type->typ_md_ops);
288 kobject_put(&type->typ_kobj);
289 wait_for_completion(&type->typ_kobj_unregister);
291 /* we do not use type->typ_procroot as for compatibility purposes
292 * other modules can share names (i.e. lod can use lov entry). so
293 * we can't reference pointer as it can get invalided when another
294 * module removes the entry */
295 #ifdef CONFIG_PROC_FS
296 if (type->typ_procroot != NULL)
297 remove_proc_subtree(type->typ_name, proc_lustre_root);
298 if (type->typ_procsym != NULL)
299 lprocfs_remove(&type->typ_procsym);
302 lu_device_type_fini(type->typ_lu);
304 spin_lock(&obd_types_lock);
305 list_del(&type->typ_chain);
306 spin_unlock(&obd_types_lock);
307 OBD_FREE(type->typ_name, strlen(name) + 1);
308 if (type->typ_dt_ops != NULL)
309 OBD_FREE_PTR(type->typ_dt_ops);
310 if (type->typ_md_ops != NULL)
311 OBD_FREE_PTR(type->typ_md_ops);
312 OBD_FREE(type, sizeof(*type));
314 } /* class_unregister_type */
315 EXPORT_SYMBOL(class_unregister_type);
318 * Create a new obd device.
320 * Allocate the new obd_device and initialize it.
322 * \param[in] type_name obd device type string.
323 * \param[in] name obd device name.
324 * \param[in] uuid obd device UUID
326 * \retval newdev pointer to created obd_device
327 * \retval ERR_PTR(errno) on error
329 struct obd_device *class_newdev(const char *type_name, const char *name,
332 struct obd_device *newdev;
333 struct obd_type *type = NULL;
336 if (strlen(name) >= MAX_OBD_NAME) {
337 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
338 RETURN(ERR_PTR(-EINVAL));
341 type = class_get_type(type_name);
343 CERROR("OBD: unknown type: %s\n", type_name);
344 RETURN(ERR_PTR(-ENODEV));
347 newdev = obd_device_alloc();
348 if (newdev == NULL) {
349 class_put_type(type);
350 RETURN(ERR_PTR(-ENOMEM));
352 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
353 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
354 newdev->obd_type = type;
355 newdev->obd_minor = -1;
357 rwlock_init(&newdev->obd_pool_lock);
358 newdev->obd_pool_limit = 0;
359 newdev->obd_pool_slv = 0;
361 INIT_LIST_HEAD(&newdev->obd_exports);
362 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
363 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
364 INIT_LIST_HEAD(&newdev->obd_exports_timed);
365 INIT_LIST_HEAD(&newdev->obd_nid_stats);
366 spin_lock_init(&newdev->obd_nid_lock);
367 spin_lock_init(&newdev->obd_dev_lock);
368 mutex_init(&newdev->obd_dev_mutex);
369 spin_lock_init(&newdev->obd_osfs_lock);
370 /* newdev->obd_osfs_age must be set to a value in the distant
371 * past to guarantee a fresh statfs is fetched on mount. */
372 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
374 /* XXX belongs in setup not attach */
375 init_rwsem(&newdev->obd_observer_link_sem);
377 init_timer(&newdev->obd_recovery_timer);
378 spin_lock_init(&newdev->obd_recovery_task_lock);
379 init_waitqueue_head(&newdev->obd_next_transno_waitq);
380 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
381 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
382 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
383 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
384 INIT_LIST_HEAD(&newdev->obd_evict_list);
385 INIT_LIST_HEAD(&newdev->obd_lwp_list);
387 llog_group_init(&newdev->obd_olg);
388 /* Detach drops this */
389 atomic_set(&newdev->obd_refcount, 1);
390 lu_ref_init(&newdev->obd_reference);
391 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
393 newdev->obd_conn_inprogress = 0;
395 strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
397 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
398 newdev->obd_name, newdev);
406 * \param[in] obd obd_device to be freed
410 void class_free_dev(struct obd_device *obd)
412 struct obd_type *obd_type = obd->obd_type;
414 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
415 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
416 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
417 "obd %p != obd_devs[%d] %p\n",
418 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
419 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
420 "obd_refcount should be 0, not %d\n",
421 atomic_read(&obd->obd_refcount));
422 LASSERT(obd_type != NULL);
424 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
425 obd->obd_name, obd->obd_type->typ_name);
427 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
428 obd->obd_name, obd->obd_uuid.uuid);
429 if (obd->obd_stopping) {
432 /* If we're not stopping, we were never set up */
433 err = obd_cleanup(obd);
435 CERROR("Cleanup %s returned %d\n",
439 obd_device_free(obd);
441 class_put_type(obd_type);
445 * Unregister obd device.
447 * Free slot in obd_dev[] used by \a obd.
449 * \param[in] new_obd obd_device to be unregistered
453 void class_unregister_device(struct obd_device *obd)
455 write_lock(&obd_dev_lock);
456 if (obd->obd_minor >= 0) {
457 LASSERT(obd_devs[obd->obd_minor] == obd);
458 obd_devs[obd->obd_minor] = NULL;
461 write_unlock(&obd_dev_lock);
465 * Register obd device.
467 * Find free slot in obd_devs[], fills it with \a new_obd.
469 * \param[in] new_obd obd_device to be registered
472 * \retval -EEXIST device with this name is registered
473 * \retval -EOVERFLOW obd_devs[] is full
475 int class_register_device(struct obd_device *new_obd)
479 int new_obd_minor = 0;
480 bool minor_assign = false;
481 bool retried = false;
484 write_lock(&obd_dev_lock);
485 for (i = 0; i < class_devno_max(); i++) {
486 struct obd_device *obd = class_num2obd(i);
489 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
492 write_unlock(&obd_dev_lock);
494 /* the obd_device could be waited to be
495 * destroyed by the "obd_zombie_impexp_thread".
497 obd_zombie_barrier();
502 CERROR("%s: already exists, won't add\n",
504 /* in case we found a free slot before duplicate */
505 minor_assign = false;
509 if (!minor_assign && obd == NULL) {
516 new_obd->obd_minor = new_obd_minor;
517 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
518 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
519 obd_devs[new_obd_minor] = new_obd;
523 CERROR("%s: all %u/%u devices used, increase "
524 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
525 i, class_devno_max(), ret);
528 write_unlock(&obd_dev_lock);
533 static int class_name2dev_nolock(const char *name)
540 for (i = 0; i < class_devno_max(); i++) {
541 struct obd_device *obd = class_num2obd(i);
543 if (obd && strcmp(name, obd->obd_name) == 0) {
544 /* Make sure we finished attaching before we give
545 out any references */
546 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
547 if (obd->obd_attached) {
557 int class_name2dev(const char *name)
564 read_lock(&obd_dev_lock);
565 i = class_name2dev_nolock(name);
566 read_unlock(&obd_dev_lock);
570 EXPORT_SYMBOL(class_name2dev);
572 struct obd_device *class_name2obd(const char *name)
574 int dev = class_name2dev(name);
576 if (dev < 0 || dev > class_devno_max())
578 return class_num2obd(dev);
580 EXPORT_SYMBOL(class_name2obd);
582 int class_uuid2dev_nolock(struct obd_uuid *uuid)
586 for (i = 0; i < class_devno_max(); i++) {
587 struct obd_device *obd = class_num2obd(i);
589 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
590 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
598 int class_uuid2dev(struct obd_uuid *uuid)
602 read_lock(&obd_dev_lock);
603 i = class_uuid2dev_nolock(uuid);
604 read_unlock(&obd_dev_lock);
608 EXPORT_SYMBOL(class_uuid2dev);
610 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
612 int dev = class_uuid2dev(uuid);
615 return class_num2obd(dev);
617 EXPORT_SYMBOL(class_uuid2obd);
620 * Get obd device from ::obd_devs[]
622 * \param num [in] array index
624 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
625 * otherwise return the obd device there.
627 struct obd_device *class_num2obd(int num)
629 struct obd_device *obd = NULL;
631 if (num < class_devno_max()) {
636 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
637 "%p obd_magic %08x != %08x\n",
638 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
639 LASSERTF(obd->obd_minor == num,
640 "%p obd_minor %0d != %0d\n",
641 obd, obd->obd_minor, num);
648 * Find obd in obd_dev[] by name or uuid.
650 * Increment obd's refcount if found.
652 * \param[in] str obd name or uuid
654 * \retval NULL if not found
655 * \retval target pointer to found obd_device
657 struct obd_device *class_dev_by_str(const char *str)
659 struct obd_device *target = NULL;
660 struct obd_uuid tgtuuid;
663 obd_str2uuid(&tgtuuid, str);
665 read_lock(&obd_dev_lock);
666 rc = class_uuid2dev_nolock(&tgtuuid);
668 rc = class_name2dev_nolock(str);
671 target = class_num2obd(rc);
674 class_incref(target, "find", current);
675 read_unlock(&obd_dev_lock);
679 EXPORT_SYMBOL(class_dev_by_str);
682 * Get obd devices count. Device in any
684 * \retval obd device count
686 int get_devices_count(void)
688 int index, max_index = class_devno_max(), dev_count = 0;
690 read_lock(&obd_dev_lock);
691 for (index = 0; index <= max_index; index++) {
692 struct obd_device *obd = class_num2obd(index);
696 read_unlock(&obd_dev_lock);
700 EXPORT_SYMBOL(get_devices_count);
702 void class_obd_list(void)
707 read_lock(&obd_dev_lock);
708 for (i = 0; i < class_devno_max(); i++) {
709 struct obd_device *obd = class_num2obd(i);
713 if (obd->obd_stopping)
715 else if (obd->obd_set_up)
717 else if (obd->obd_attached)
721 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
722 i, status, obd->obd_type->typ_name,
723 obd->obd_name, obd->obd_uuid.uuid,
724 atomic_read(&obd->obd_refcount));
726 read_unlock(&obd_dev_lock);
730 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
731 specified, then only the client with that uuid is returned,
732 otherwise any client connected to the tgt is returned. */
733 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
734 const char * typ_name,
735 struct obd_uuid *grp_uuid)
739 read_lock(&obd_dev_lock);
740 for (i = 0; i < class_devno_max(); i++) {
741 struct obd_device *obd = class_num2obd(i);
745 if ((strncmp(obd->obd_type->typ_name, typ_name,
746 strlen(typ_name)) == 0)) {
747 if (obd_uuid_equals(tgt_uuid,
748 &obd->u.cli.cl_target_uuid) &&
749 ((grp_uuid)? obd_uuid_equals(grp_uuid,
750 &obd->obd_uuid) : 1)) {
751 read_unlock(&obd_dev_lock);
756 read_unlock(&obd_dev_lock);
760 EXPORT_SYMBOL(class_find_client_obd);
762 /* Iterate the obd_device list looking devices have grp_uuid. Start
763 searching at *next, and if a device is found, the next index to look
764 at is saved in *next. If next is NULL, then the first matching device
765 will always be returned. */
766 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
772 else if (*next >= 0 && *next < class_devno_max())
777 read_lock(&obd_dev_lock);
778 for (; i < class_devno_max(); i++) {
779 struct obd_device *obd = class_num2obd(i);
783 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
786 read_unlock(&obd_dev_lock);
790 read_unlock(&obd_dev_lock);
794 EXPORT_SYMBOL(class_devices_in_group);
797 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
798 * adjust sptlrpc settings accordingly.
800 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
802 struct obd_device *obd;
806 LASSERT(namelen > 0);
808 read_lock(&obd_dev_lock);
809 for (i = 0; i < class_devno_max(); i++) {
810 obd = class_num2obd(i);
812 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
815 /* only notify mdc, osc, osp, lwp, mdt, ost
816 * because only these have a -sptlrpc llog */
817 type = obd->obd_type->typ_name;
818 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
819 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
820 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
821 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
822 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
823 strcmp(type, LUSTRE_OST_NAME) != 0)
826 if (strncmp(obd->obd_name, fsname, namelen))
829 class_incref(obd, __FUNCTION__, obd);
830 read_unlock(&obd_dev_lock);
831 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
832 sizeof(KEY_SPTLRPC_CONF),
833 KEY_SPTLRPC_CONF, 0, NULL, NULL);
835 class_decref(obd, __FUNCTION__, obd);
836 read_lock(&obd_dev_lock);
838 read_unlock(&obd_dev_lock);
841 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
843 void obd_cleanup_caches(void)
846 if (obd_device_cachep) {
847 kmem_cache_destroy(obd_device_cachep);
848 obd_device_cachep = NULL;
851 kmem_cache_destroy(obdo_cachep);
855 kmem_cache_destroy(import_cachep);
856 import_cachep = NULL;
862 int obd_init_caches(void)
867 LASSERT(obd_device_cachep == NULL);
868 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
869 sizeof(struct obd_device),
871 if (!obd_device_cachep)
872 GOTO(out, rc = -ENOMEM);
874 LASSERT(obdo_cachep == NULL);
875 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
878 GOTO(out, rc = -ENOMEM);
880 LASSERT(import_cachep == NULL);
881 import_cachep = kmem_cache_create("ll_import_cache",
882 sizeof(struct obd_import),
885 GOTO(out, rc = -ENOMEM);
889 obd_cleanup_caches();
893 /* map connection to client */
894 struct obd_export *class_conn2export(struct lustre_handle *conn)
896 struct obd_export *export;
900 CDEBUG(D_CACHE, "looking for null handle\n");
904 if (conn->cookie == -1) { /* this means assign a new connection */
905 CDEBUG(D_CACHE, "want a new connection\n");
909 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
910 export = class_handle2object(conn->cookie, NULL);
913 EXPORT_SYMBOL(class_conn2export);
915 struct obd_device *class_exp2obd(struct obd_export *exp)
921 EXPORT_SYMBOL(class_exp2obd);
923 struct obd_device *class_conn2obd(struct lustre_handle *conn)
925 struct obd_export *export;
926 export = class_conn2export(conn);
928 struct obd_device *obd = export->exp_obd;
929 class_export_put(export);
935 struct obd_import *class_exp2cliimp(struct obd_export *exp)
937 struct obd_device *obd = exp->exp_obd;
940 return obd->u.cli.cl_import;
942 EXPORT_SYMBOL(class_exp2cliimp);
944 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
946 struct obd_device *obd = class_conn2obd(conn);
949 return obd->u.cli.cl_import;
952 /* Export management functions */
953 static void class_export_destroy(struct obd_export *exp)
955 struct obd_device *obd = exp->exp_obd;
958 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
959 LASSERT(obd != NULL);
961 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
962 exp->exp_client_uuid.uuid, obd->obd_name);
964 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
965 if (exp->exp_connection)
966 ptlrpc_put_connection_superhack(exp->exp_connection);
968 LASSERT(list_empty(&exp->exp_outstanding_replies));
969 LASSERT(list_empty(&exp->exp_uncommitted_replies));
970 LASSERT(list_empty(&exp->exp_req_replay_queue));
971 LASSERT(list_empty(&exp->exp_hp_rpcs));
972 obd_destroy_export(exp);
973 /* self export doesn't hold a reference to an obd, although it
974 * exists until freeing of the obd */
975 if (exp != obd->obd_self_export)
976 class_decref(obd, "export", exp);
978 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
982 static void export_handle_addref(void *export)
984 class_export_get(export);
987 static struct portals_handle_ops export_handle_ops = {
988 .hop_addref = export_handle_addref,
992 struct obd_export *class_export_get(struct obd_export *exp)
994 atomic_inc(&exp->exp_refcount);
995 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
996 atomic_read(&exp->exp_refcount));
999 EXPORT_SYMBOL(class_export_get);
1001 void class_export_put(struct obd_export *exp)
1003 LASSERT(exp != NULL);
1004 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
1005 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
1006 atomic_read(&exp->exp_refcount) - 1);
1008 if (atomic_dec_and_test(&exp->exp_refcount)) {
1009 struct obd_device *obd = exp->exp_obd;
1011 CDEBUG(D_IOCTL, "final put %p/%s\n",
1012 exp, exp->exp_client_uuid.uuid);
1014 /* release nid stat refererence */
1015 lprocfs_exp_cleanup(exp);
1017 if (exp == obd->obd_self_export) {
1018 /* self export should be destroyed without
1019 * zombie thread as it doesn't hold a
1020 * reference to obd and doesn't hold any
1022 class_export_destroy(exp);
1023 /* self export is destroyed, no class
1024 * references exist and it is safe to free
1026 class_free_dev(obd);
1028 LASSERT(!list_empty(&exp->exp_obd_chain));
1029 obd_zombie_export_add(exp);
1034 EXPORT_SYMBOL(class_export_put);
1035 /* Creates a new export, adds it to the hash table, and returns a
1036 * pointer to it. The refcount is 2: one for the hash reference, and
1037 * one for the pointer returned by this function. */
1038 struct obd_export *__class_new_export(struct obd_device *obd,
1039 struct obd_uuid *cluuid, bool is_self)
1041 struct obd_export *export;
1042 struct cfs_hash *hash = NULL;
1046 OBD_ALLOC_PTR(export);
1048 return ERR_PTR(-ENOMEM);
1050 export->exp_conn_cnt = 0;
1051 export->exp_lock_hash = NULL;
1052 export->exp_flock_hash = NULL;
1053 /* 2 = class_handle_hash + last */
1054 atomic_set(&export->exp_refcount, 2);
1055 atomic_set(&export->exp_rpc_count, 0);
1056 atomic_set(&export->exp_cb_count, 0);
1057 atomic_set(&export->exp_locks_count, 0);
1058 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1059 INIT_LIST_HEAD(&export->exp_locks_list);
1060 spin_lock_init(&export->exp_locks_list_guard);
1062 atomic_set(&export->exp_replay_count, 0);
1063 export->exp_obd = obd;
1064 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1065 spin_lock_init(&export->exp_uncommitted_replies_lock);
1066 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1067 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1068 INIT_LIST_HEAD(&export->exp_handle.h_link);
1069 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1070 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1071 class_handle_hash(&export->exp_handle, &export_handle_ops);
1072 export->exp_last_request_time = ktime_get_real_seconds();
1073 spin_lock_init(&export->exp_lock);
1074 spin_lock_init(&export->exp_rpc_lock);
1075 INIT_HLIST_NODE(&export->exp_uuid_hash);
1076 INIT_HLIST_NODE(&export->exp_nid_hash);
1077 INIT_HLIST_NODE(&export->exp_gen_hash);
1078 spin_lock_init(&export->exp_bl_list_lock);
1079 INIT_LIST_HEAD(&export->exp_bl_list);
1080 INIT_LIST_HEAD(&export->exp_stale_list);
1082 export->exp_sp_peer = LUSTRE_SP_ANY;
1083 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1084 export->exp_client_uuid = *cluuid;
1085 obd_init_export(export);
1087 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1088 spin_lock(&obd->obd_dev_lock);
1089 /* shouldn't happen, but might race */
1090 if (obd->obd_stopping)
1091 GOTO(exit_unlock, rc = -ENODEV);
1093 hash = cfs_hash_getref(obd->obd_uuid_hash);
1095 GOTO(exit_unlock, rc = -ENODEV);
1096 spin_unlock(&obd->obd_dev_lock);
1098 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1100 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1101 obd->obd_name, cluuid->uuid, rc);
1102 GOTO(exit_err, rc = -EALREADY);
1106 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1107 spin_lock(&obd->obd_dev_lock);
1108 if (obd->obd_stopping) {
1110 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1111 GOTO(exit_unlock, rc = -ESHUTDOWN);
1115 class_incref(obd, "export", export);
1116 list_add_tail(&export->exp_obd_chain_timed,
1117 &obd->obd_exports_timed);
1118 list_add(&export->exp_obd_chain, &obd->obd_exports);
1119 obd->obd_num_exports++;
1121 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1122 INIT_LIST_HEAD(&export->exp_obd_chain);
1124 spin_unlock(&obd->obd_dev_lock);
1126 cfs_hash_putref(hash);
1130 spin_unlock(&obd->obd_dev_lock);
1133 cfs_hash_putref(hash);
1134 class_handle_unhash(&export->exp_handle);
1135 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1136 obd_destroy_export(export);
1137 OBD_FREE_PTR(export);
1141 struct obd_export *class_new_export(struct obd_device *obd,
1142 struct obd_uuid *uuid)
1144 return __class_new_export(obd, uuid, false);
1146 EXPORT_SYMBOL(class_new_export);
1148 struct obd_export *class_new_export_self(struct obd_device *obd,
1149 struct obd_uuid *uuid)
1151 return __class_new_export(obd, uuid, true);
1154 void class_unlink_export(struct obd_export *exp)
1156 class_handle_unhash(&exp->exp_handle);
1158 if (exp->exp_obd->obd_self_export == exp) {
1159 class_export_put(exp);
1163 spin_lock(&exp->exp_obd->obd_dev_lock);
1164 /* delete an uuid-export hashitem from hashtables */
1165 if (!hlist_unhashed(&exp->exp_uuid_hash))
1166 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1167 &exp->exp_client_uuid,
1168 &exp->exp_uuid_hash);
1170 #ifdef HAVE_SERVER_SUPPORT
1171 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1172 struct tg_export_data *ted = &exp->exp_target_data;
1173 struct cfs_hash *hash;
1175 /* Because obd_gen_hash will not be released until
1176 * class_cleanup(), so hash should never be NULL here */
1177 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1178 LASSERT(hash != NULL);
1179 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1180 &exp->exp_gen_hash);
1181 cfs_hash_putref(hash);
1183 #endif /* HAVE_SERVER_SUPPORT */
1185 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1186 list_del_init(&exp->exp_obd_chain_timed);
1187 exp->exp_obd->obd_num_exports--;
1188 spin_unlock(&exp->exp_obd->obd_dev_lock);
1189 atomic_inc(&obd_stale_export_num);
1191 /* A reference is kept by obd_stale_exports list */
1192 obd_stale_export_put(exp);
1194 EXPORT_SYMBOL(class_unlink_export);
1196 /* Import management functions */
1197 static void class_import_destroy(struct obd_import *imp)
1201 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1202 imp->imp_obd->obd_name);
1204 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1206 ptlrpc_put_connection_superhack(imp->imp_connection);
1208 while (!list_empty(&imp->imp_conn_list)) {
1209 struct obd_import_conn *imp_conn;
1211 imp_conn = list_entry(imp->imp_conn_list.next,
1212 struct obd_import_conn, oic_item);
1213 list_del_init(&imp_conn->oic_item);
1214 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1215 OBD_FREE(imp_conn, sizeof(*imp_conn));
1218 LASSERT(imp->imp_sec == NULL);
1219 class_decref(imp->imp_obd, "import", imp);
1220 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1224 static void import_handle_addref(void *import)
1226 class_import_get(import);
1229 static struct portals_handle_ops import_handle_ops = {
1230 .hop_addref = import_handle_addref,
1234 struct obd_import *class_import_get(struct obd_import *import)
1236 atomic_inc(&import->imp_refcount);
1237 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1238 atomic_read(&import->imp_refcount),
1239 import->imp_obd->obd_name);
1242 EXPORT_SYMBOL(class_import_get);
1244 void class_import_put(struct obd_import *imp)
1248 LASSERT(list_empty(&imp->imp_zombie_chain));
1249 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1251 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1252 atomic_read(&imp->imp_refcount) - 1,
1253 imp->imp_obd->obd_name);
1255 if (atomic_dec_and_test(&imp->imp_refcount)) {
1256 CDEBUG(D_INFO, "final put import %p\n", imp);
1257 obd_zombie_import_add(imp);
1260 /* catch possible import put race */
1261 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1264 EXPORT_SYMBOL(class_import_put);
1266 static void init_imp_at(struct imp_at *at) {
1268 at_init(&at->iat_net_latency, 0, 0);
1269 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1270 /* max service estimates are tracked on the server side, so
1271 don't use the AT history here, just use the last reported
1272 val. (But keep hist for proc histogram, worst_ever) */
1273 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1278 struct obd_import *class_new_import(struct obd_device *obd)
1280 struct obd_import *imp;
1281 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1283 OBD_ALLOC(imp, sizeof(*imp));
1287 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1288 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1289 INIT_LIST_HEAD(&imp->imp_replay_list);
1290 INIT_LIST_HEAD(&imp->imp_sending_list);
1291 INIT_LIST_HEAD(&imp->imp_delayed_list);
1292 INIT_LIST_HEAD(&imp->imp_committed_list);
1293 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1294 imp->imp_known_replied_xid = 0;
1295 imp->imp_replay_cursor = &imp->imp_committed_list;
1296 spin_lock_init(&imp->imp_lock);
1297 imp->imp_last_success_conn = 0;
1298 imp->imp_state = LUSTRE_IMP_NEW;
1299 imp->imp_obd = class_incref(obd, "import", imp);
1300 mutex_init(&imp->imp_sec_mutex);
1301 init_waitqueue_head(&imp->imp_recovery_waitq);
1303 if (curr_pid_ns->child_reaper)
1304 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1306 imp->imp_sec_refpid = 1;
1308 atomic_set(&imp->imp_refcount, 2);
1309 atomic_set(&imp->imp_unregistering, 0);
1310 atomic_set(&imp->imp_inflight, 0);
1311 atomic_set(&imp->imp_replay_inflight, 0);
1312 atomic_set(&imp->imp_inval_count, 0);
1313 INIT_LIST_HEAD(&imp->imp_conn_list);
1314 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1315 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1316 init_imp_at(&imp->imp_at);
1318 /* the default magic is V2, will be used in connect RPC, and
1319 * then adjusted according to the flags in request/reply. */
1320 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1324 EXPORT_SYMBOL(class_new_import);
1326 void class_destroy_import(struct obd_import *import)
1328 LASSERT(import != NULL);
1329 LASSERT(import != LP_POISON);
1331 class_handle_unhash(&import->imp_handle);
1333 spin_lock(&import->imp_lock);
1334 import->imp_generation++;
1335 spin_unlock(&import->imp_lock);
1336 class_import_put(import);
1338 EXPORT_SYMBOL(class_destroy_import);
1340 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1342 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1344 spin_lock(&exp->exp_locks_list_guard);
1346 LASSERT(lock->l_exp_refs_nr >= 0);
1348 if (lock->l_exp_refs_target != NULL &&
1349 lock->l_exp_refs_target != exp) {
1350 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1351 exp, lock, lock->l_exp_refs_target);
1353 if ((lock->l_exp_refs_nr ++) == 0) {
1354 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1355 lock->l_exp_refs_target = exp;
1357 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1358 lock, exp, lock->l_exp_refs_nr);
1359 spin_unlock(&exp->exp_locks_list_guard);
1361 EXPORT_SYMBOL(__class_export_add_lock_ref);
1363 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1365 spin_lock(&exp->exp_locks_list_guard);
1366 LASSERT(lock->l_exp_refs_nr > 0);
1367 if (lock->l_exp_refs_target != exp) {
1368 LCONSOLE_WARN("lock %p, "
1369 "mismatching export pointers: %p, %p\n",
1370 lock, lock->l_exp_refs_target, exp);
1372 if (-- lock->l_exp_refs_nr == 0) {
1373 list_del_init(&lock->l_exp_refs_link);
1374 lock->l_exp_refs_target = NULL;
1376 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1377 lock, exp, lock->l_exp_refs_nr);
1378 spin_unlock(&exp->exp_locks_list_guard);
1380 EXPORT_SYMBOL(__class_export_del_lock_ref);
1383 /* A connection defines an export context in which preallocation can
1384 be managed. This releases the export pointer reference, and returns
1385 the export handle, so the export refcount is 1 when this function
1387 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1388 struct obd_uuid *cluuid)
1390 struct obd_export *export;
1391 LASSERT(conn != NULL);
1392 LASSERT(obd != NULL);
1393 LASSERT(cluuid != NULL);
1396 export = class_new_export(obd, cluuid);
1398 RETURN(PTR_ERR(export));
1400 conn->cookie = export->exp_handle.h_cookie;
1401 class_export_put(export);
1403 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1404 cluuid->uuid, conn->cookie);
1407 EXPORT_SYMBOL(class_connect);
1409 /* if export is involved in recovery then clean up related things */
1410 static void class_export_recovery_cleanup(struct obd_export *exp)
1412 struct obd_device *obd = exp->exp_obd;
1414 spin_lock(&obd->obd_recovery_task_lock);
1415 if (obd->obd_recovering) {
1416 if (exp->exp_in_recovery) {
1417 spin_lock(&exp->exp_lock);
1418 exp->exp_in_recovery = 0;
1419 spin_unlock(&exp->exp_lock);
1420 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1421 atomic_dec(&obd->obd_connected_clients);
1424 /* if called during recovery then should update
1425 * obd_stale_clients counter,
1426 * lightweight exports are not counted */
1427 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1428 exp->exp_obd->obd_stale_clients++;
1430 spin_unlock(&obd->obd_recovery_task_lock);
1432 spin_lock(&exp->exp_lock);
1433 /** Cleanup req replay fields */
1434 if (exp->exp_req_replay_needed) {
1435 exp->exp_req_replay_needed = 0;
1437 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1438 atomic_dec(&obd->obd_req_replay_clients);
1441 /** Cleanup lock replay data */
1442 if (exp->exp_lock_replay_needed) {
1443 exp->exp_lock_replay_needed = 0;
1445 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1446 atomic_dec(&obd->obd_lock_replay_clients);
1448 spin_unlock(&exp->exp_lock);
1451 /* This function removes 1-3 references from the export:
1452 * 1 - for export pointer passed
1453 * and if disconnect really need
1454 * 2 - removing from hash
1455 * 3 - in client_unlink_export
1456 * The export pointer passed to this function can destroyed */
1457 int class_disconnect(struct obd_export *export)
1459 int already_disconnected;
1462 if (export == NULL) {
1463 CWARN("attempting to free NULL export %p\n", export);
1467 spin_lock(&export->exp_lock);
1468 already_disconnected = export->exp_disconnected;
1469 export->exp_disconnected = 1;
1470 /* We hold references of export for uuid hash
1471 * and nid_hash and export link at least. So
1472 * it is safe to call cfs_hash_del in there. */
1473 if (!hlist_unhashed(&export->exp_nid_hash))
1474 cfs_hash_del(export->exp_obd->obd_nid_hash,
1475 &export->exp_connection->c_peer.nid,
1476 &export->exp_nid_hash);
1477 spin_unlock(&export->exp_lock);
1479 /* class_cleanup(), abort_recovery(), and class_fail_export()
1480 * all end up in here, and if any of them race we shouldn't
1481 * call extra class_export_puts(). */
1482 if (already_disconnected) {
1483 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1484 GOTO(no_disconn, already_disconnected);
1487 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1488 export->exp_handle.h_cookie);
1490 class_export_recovery_cleanup(export);
1491 class_unlink_export(export);
1493 class_export_put(export);
1496 EXPORT_SYMBOL(class_disconnect);
1498 /* Return non-zero for a fully connected export */
1499 int class_connected_export(struct obd_export *exp)
1504 spin_lock(&exp->exp_lock);
1505 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1506 spin_unlock(&exp->exp_lock);
1510 EXPORT_SYMBOL(class_connected_export);
1512 static void class_disconnect_export_list(struct list_head *list,
1513 enum obd_option flags)
1516 struct obd_export *exp;
1519 /* It's possible that an export may disconnect itself, but
1520 * nothing else will be added to this list. */
1521 while (!list_empty(list)) {
1522 exp = list_entry(list->next, struct obd_export,
1524 /* need for safe call CDEBUG after obd_disconnect */
1525 class_export_get(exp);
1527 spin_lock(&exp->exp_lock);
1528 exp->exp_flags = flags;
1529 spin_unlock(&exp->exp_lock);
1531 if (obd_uuid_equals(&exp->exp_client_uuid,
1532 &exp->exp_obd->obd_uuid)) {
1534 "exp %p export uuid == obd uuid, don't discon\n",
1536 /* Need to delete this now so we don't end up pointing
1537 * to work_list later when this export is cleaned up. */
1538 list_del_init(&exp->exp_obd_chain);
1539 class_export_put(exp);
1543 class_export_get(exp);
1544 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1545 "last request at %lld\n",
1546 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1547 exp, exp->exp_last_request_time);
1548 /* release one export reference anyway */
1549 rc = obd_disconnect(exp);
1551 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1552 obd_export_nid2str(exp), exp, rc);
1553 class_export_put(exp);
1558 void class_disconnect_exports(struct obd_device *obd)
1560 struct list_head work_list;
1563 /* Move all of the exports from obd_exports to a work list, en masse. */
1564 INIT_LIST_HEAD(&work_list);
1565 spin_lock(&obd->obd_dev_lock);
1566 list_splice_init(&obd->obd_exports, &work_list);
1567 list_splice_init(&obd->obd_delayed_exports, &work_list);
1568 spin_unlock(&obd->obd_dev_lock);
1570 if (!list_empty(&work_list)) {
1571 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1572 "disconnecting them\n", obd->obd_minor, obd);
1573 class_disconnect_export_list(&work_list,
1574 exp_flags_from_obd(obd));
1576 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1577 obd->obd_minor, obd);
1580 EXPORT_SYMBOL(class_disconnect_exports);
1582 /* Remove exports that have not completed recovery.
1584 void class_disconnect_stale_exports(struct obd_device *obd,
1585 int (*test_export)(struct obd_export *))
1587 struct list_head work_list;
1588 struct obd_export *exp, *n;
1592 INIT_LIST_HEAD(&work_list);
1593 spin_lock(&obd->obd_dev_lock);
1594 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1596 /* don't count self-export as client */
1597 if (obd_uuid_equals(&exp->exp_client_uuid,
1598 &exp->exp_obd->obd_uuid))
1601 /* don't evict clients which have no slot in last_rcvd
1602 * (e.g. lightweight connection) */
1603 if (exp->exp_target_data.ted_lr_idx == -1)
1606 spin_lock(&exp->exp_lock);
1607 if (exp->exp_failed || test_export(exp)) {
1608 spin_unlock(&exp->exp_lock);
1611 exp->exp_failed = 1;
1612 spin_unlock(&exp->exp_lock);
1614 list_move(&exp->exp_obd_chain, &work_list);
1616 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1617 obd->obd_name, exp->exp_client_uuid.uuid,
1618 exp->exp_connection == NULL ? "<unknown>" :
1619 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1620 print_export_data(exp, "EVICTING", 0, D_HA);
1622 spin_unlock(&obd->obd_dev_lock);
1625 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1626 obd->obd_name, evicted);
1628 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1629 OBD_OPT_ABORT_RECOV);
1632 EXPORT_SYMBOL(class_disconnect_stale_exports);
1634 void class_fail_export(struct obd_export *exp)
1636 int rc, already_failed;
1638 spin_lock(&exp->exp_lock);
1639 already_failed = exp->exp_failed;
1640 exp->exp_failed = 1;
1641 spin_unlock(&exp->exp_lock);
1643 if (already_failed) {
1644 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1645 exp, exp->exp_client_uuid.uuid);
1649 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1650 exp, exp->exp_client_uuid.uuid);
1652 if (obd_dump_on_timeout)
1653 libcfs_debug_dumplog();
1655 /* need for safe call CDEBUG after obd_disconnect */
1656 class_export_get(exp);
1658 /* Most callers into obd_disconnect are removing their own reference
1659 * (request, for example) in addition to the one from the hash table.
1660 * We don't have such a reference here, so make one. */
1661 class_export_get(exp);
1662 rc = obd_disconnect(exp);
1664 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1666 CDEBUG(D_HA, "disconnected export %p/%s\n",
1667 exp, exp->exp_client_uuid.uuid);
1668 class_export_put(exp);
1670 EXPORT_SYMBOL(class_fail_export);
1672 char *obd_export_nid2str(struct obd_export *exp)
1674 if (exp->exp_connection != NULL)
1675 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1679 EXPORT_SYMBOL(obd_export_nid2str);
1681 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1683 struct cfs_hash *nid_hash;
1684 struct obd_export *doomed_exp = NULL;
1685 int exports_evicted = 0;
1687 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1689 spin_lock(&obd->obd_dev_lock);
1690 /* umount has run already, so evict thread should leave
1691 * its task to umount thread now */
1692 if (obd->obd_stopping) {
1693 spin_unlock(&obd->obd_dev_lock);
1694 return exports_evicted;
1696 nid_hash = obd->obd_nid_hash;
1697 cfs_hash_getref(nid_hash);
1698 spin_unlock(&obd->obd_dev_lock);
1701 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1702 if (doomed_exp == NULL)
1705 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1706 "nid %s found, wanted nid %s, requested nid %s\n",
1707 obd_export_nid2str(doomed_exp),
1708 libcfs_nid2str(nid_key), nid);
1709 LASSERTF(doomed_exp != obd->obd_self_export,
1710 "self-export is hashed by NID?\n");
1712 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1713 "request\n", obd->obd_name,
1714 obd_uuid2str(&doomed_exp->exp_client_uuid),
1715 obd_export_nid2str(doomed_exp));
1716 class_fail_export(doomed_exp);
1717 class_export_put(doomed_exp);
1720 cfs_hash_putref(nid_hash);
1722 if (!exports_evicted)
1723 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1724 obd->obd_name, nid);
1725 return exports_evicted;
1727 EXPORT_SYMBOL(obd_export_evict_by_nid);
1729 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1731 struct cfs_hash *uuid_hash;
1732 struct obd_export *doomed_exp = NULL;
1733 struct obd_uuid doomed_uuid;
1734 int exports_evicted = 0;
1736 spin_lock(&obd->obd_dev_lock);
1737 if (obd->obd_stopping) {
1738 spin_unlock(&obd->obd_dev_lock);
1739 return exports_evicted;
1741 uuid_hash = obd->obd_uuid_hash;
1742 cfs_hash_getref(uuid_hash);
1743 spin_unlock(&obd->obd_dev_lock);
1745 obd_str2uuid(&doomed_uuid, uuid);
1746 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1747 CERROR("%s: can't evict myself\n", obd->obd_name);
1748 cfs_hash_putref(uuid_hash);
1749 return exports_evicted;
1752 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1754 if (doomed_exp == NULL) {
1755 CERROR("%s: can't disconnect %s: no exports found\n",
1756 obd->obd_name, uuid);
1758 CWARN("%s: evicting %s at adminstrative request\n",
1759 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1760 class_fail_export(doomed_exp);
1761 class_export_put(doomed_exp);
1764 cfs_hash_putref(uuid_hash);
1766 return exports_evicted;
1769 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1770 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1771 EXPORT_SYMBOL(class_export_dump_hook);
1774 static void print_export_data(struct obd_export *exp, const char *status,
1775 int locks, int debug_level)
1777 struct ptlrpc_reply_state *rs;
1778 struct ptlrpc_reply_state *first_reply = NULL;
1781 spin_lock(&exp->exp_lock);
1782 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1788 spin_unlock(&exp->exp_lock);
1790 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1791 "%p %s %llu stale:%d\n",
1792 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1793 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1794 atomic_read(&exp->exp_rpc_count),
1795 atomic_read(&exp->exp_cb_count),
1796 atomic_read(&exp->exp_locks_count),
1797 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1798 nreplies, first_reply, nreplies > 3 ? "..." : "",
1799 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1800 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1801 if (locks && class_export_dump_hook != NULL)
1802 class_export_dump_hook(exp);
1806 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1808 struct obd_export *exp;
1810 spin_lock(&obd->obd_dev_lock);
1811 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1812 print_export_data(exp, "ACTIVE", locks, debug_level);
1813 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1814 print_export_data(exp, "UNLINKED", locks, debug_level);
1815 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1816 print_export_data(exp, "DELAYED", locks, debug_level);
1817 spin_unlock(&obd->obd_dev_lock);
1818 spin_lock(&obd_zombie_impexp_lock);
1819 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1820 print_export_data(exp, "ZOMBIE", locks, debug_level);
1821 spin_unlock(&obd_zombie_impexp_lock);
1824 void obd_exports_barrier(struct obd_device *obd)
1827 LASSERT(list_empty(&obd->obd_exports));
1828 spin_lock(&obd->obd_dev_lock);
1829 while (!list_empty(&obd->obd_unlinked_exports)) {
1830 spin_unlock(&obd->obd_dev_lock);
1831 set_current_state(TASK_UNINTERRUPTIBLE);
1832 schedule_timeout(cfs_time_seconds(waited));
1833 if (waited > 5 && is_power_of_2(waited)) {
1834 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1835 "more than %d seconds. "
1836 "The obd refcount = %d. Is it stuck?\n",
1837 obd->obd_name, waited,
1838 atomic_read(&obd->obd_refcount));
1839 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1842 spin_lock(&obd->obd_dev_lock);
1844 spin_unlock(&obd->obd_dev_lock);
1846 EXPORT_SYMBOL(obd_exports_barrier);
1848 /* Total amount of zombies to be destroyed */
1849 static int zombies_count = 0;
1852 * kill zombie imports and exports
1854 void obd_zombie_impexp_cull(void)
1856 struct obd_import *import;
1857 struct obd_export *export;
1861 spin_lock(&obd_zombie_impexp_lock);
1864 if (!list_empty(&obd_zombie_imports)) {
1865 import = list_entry(obd_zombie_imports.next,
1868 list_del_init(&import->imp_zombie_chain);
1872 if (!list_empty(&obd_zombie_exports)) {
1873 export = list_entry(obd_zombie_exports.next,
1876 list_del_init(&export->exp_obd_chain);
1879 spin_unlock(&obd_zombie_impexp_lock);
1881 if (import != NULL) {
1882 class_import_destroy(import);
1883 spin_lock(&obd_zombie_impexp_lock);
1885 spin_unlock(&obd_zombie_impexp_lock);
1888 if (export != NULL) {
1889 class_export_destroy(export);
1890 spin_lock(&obd_zombie_impexp_lock);
1892 spin_unlock(&obd_zombie_impexp_lock);
1896 } while (import != NULL || export != NULL);
1900 static DECLARE_COMPLETION(obd_zombie_start);
1901 static DECLARE_COMPLETION(obd_zombie_stop);
1902 static unsigned long obd_zombie_flags;
1903 static DECLARE_WAIT_QUEUE_HEAD(obd_zombie_waitq);
1904 static pid_t obd_zombie_pid;
1907 OBD_ZOMBIE_STOP = 0x0001,
1911 * check for work for kill zombie import/export thread.
1913 static int obd_zombie_impexp_check(void *arg)
1917 spin_lock(&obd_zombie_impexp_lock);
1918 rc = (zombies_count == 0) &&
1919 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1920 spin_unlock(&obd_zombie_impexp_lock);
1926 * Add export to the obd_zombe thread and notify it.
1928 static void obd_zombie_export_add(struct obd_export *exp) {
1929 atomic_dec(&obd_stale_export_num);
1930 spin_lock(&exp->exp_obd->obd_dev_lock);
1931 LASSERT(!list_empty(&exp->exp_obd_chain));
1932 list_del_init(&exp->exp_obd_chain);
1933 spin_unlock(&exp->exp_obd->obd_dev_lock);
1934 spin_lock(&obd_zombie_impexp_lock);
1936 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1937 spin_unlock(&obd_zombie_impexp_lock);
1939 obd_zombie_impexp_notify();
1943 * Add import to the obd_zombe thread and notify it.
1945 static void obd_zombie_import_add(struct obd_import *imp) {
1946 LASSERT(imp->imp_sec == NULL);
1947 spin_lock(&obd_zombie_impexp_lock);
1948 LASSERT(list_empty(&imp->imp_zombie_chain));
1950 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1951 spin_unlock(&obd_zombie_impexp_lock);
1953 obd_zombie_impexp_notify();
1957 * notify import/export destroy thread about new zombie.
1959 static void obd_zombie_impexp_notify(void)
1962 * Make sure obd_zomebie_impexp_thread get this notification.
1963 * It is possible this signal only get by obd_zombie_barrier, and
1964 * barrier gulps this notification and sleeps away and hangs ensues
1966 wake_up_all(&obd_zombie_waitq);
1970 * check whether obd_zombie is idle
1972 static int obd_zombie_is_idle(void)
1976 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1977 spin_lock(&obd_zombie_impexp_lock);
1978 rc = (zombies_count == 0);
1979 spin_unlock(&obd_zombie_impexp_lock);
1984 * wait when obd_zombie import/export queues become empty
1986 void obd_zombie_barrier(void)
1988 struct l_wait_info lwi = { 0 };
1990 if (obd_zombie_pid == current_pid())
1991 /* don't wait for myself */
1993 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1995 EXPORT_SYMBOL(obd_zombie_barrier);
1998 struct obd_export *obd_stale_export_get(void)
2000 struct obd_export *exp = NULL;
2003 spin_lock(&obd_stale_export_lock);
2004 if (!list_empty(&obd_stale_exports)) {
2005 exp = list_entry(obd_stale_exports.next,
2006 struct obd_export, exp_stale_list);
2007 list_del_init(&exp->exp_stale_list);
2009 spin_unlock(&obd_stale_export_lock);
2012 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
2013 atomic_read(&obd_stale_export_num));
2017 EXPORT_SYMBOL(obd_stale_export_get);
2019 void obd_stale_export_put(struct obd_export *exp)
2023 LASSERT(list_empty(&exp->exp_stale_list));
2024 if (exp->exp_lock_hash &&
2025 atomic_read(&exp->exp_lock_hash->hs_count)) {
2026 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
2027 atomic_read(&obd_stale_export_num));
2029 spin_lock_bh(&exp->exp_bl_list_lock);
2030 spin_lock(&obd_stale_export_lock);
2031 /* Add to the tail if there is no blocked locks,
2032 * to the head otherwise. */
2033 if (list_empty(&exp->exp_bl_list))
2034 list_add_tail(&exp->exp_stale_list,
2035 &obd_stale_exports);
2037 list_add(&exp->exp_stale_list,
2038 &obd_stale_exports);
2040 spin_unlock(&obd_stale_export_lock);
2041 spin_unlock_bh(&exp->exp_bl_list_lock);
2043 class_export_put(exp);
2047 EXPORT_SYMBOL(obd_stale_export_put);
2050 * Adjust the position of the export in the stale list,
2051 * i.e. move to the head of the list if is needed.
2053 void obd_stale_export_adjust(struct obd_export *exp)
2055 LASSERT(exp != NULL);
2056 spin_lock_bh(&exp->exp_bl_list_lock);
2057 spin_lock(&obd_stale_export_lock);
2059 if (!list_empty(&exp->exp_stale_list) &&
2060 !list_empty(&exp->exp_bl_list))
2061 list_move(&exp->exp_stale_list, &obd_stale_exports);
2063 spin_unlock(&obd_stale_export_lock);
2064 spin_unlock_bh(&exp->exp_bl_list_lock);
2066 EXPORT_SYMBOL(obd_stale_export_adjust);
2069 * destroy zombie export/import thread.
2071 static int obd_zombie_impexp_thread(void *unused)
2073 unshare_fs_struct();
2074 complete(&obd_zombie_start);
2076 obd_zombie_pid = current_pid();
2078 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
2079 struct l_wait_info lwi = { 0 };
2081 l_wait_event(obd_zombie_waitq,
2082 !obd_zombie_impexp_check(NULL), &lwi);
2083 obd_zombie_impexp_cull();
2086 * Notify obd_zombie_barrier callers that queues
2089 wake_up(&obd_zombie_waitq);
2092 complete(&obd_zombie_stop);
2099 * start destroy zombie import/export thread
2101 int obd_zombie_impexp_init(void)
2103 struct task_struct *task;
2105 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
2107 RETURN(PTR_ERR(task));
2109 wait_for_completion(&obd_zombie_start);
2113 * stop destroy zombie import/export thread
2115 void obd_zombie_impexp_stop(void)
2117 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
2118 obd_zombie_impexp_notify();
2119 wait_for_completion(&obd_zombie_stop);
2120 LASSERT(list_empty(&obd_stale_exports));
2123 /***** Kernel-userspace comm helpers *******/
2125 /* Get length of entire message, including header */
2126 int kuc_len(int payload_len)
2128 return sizeof(struct kuc_hdr) + payload_len;
2130 EXPORT_SYMBOL(kuc_len);
2132 /* Get a pointer to kuc header, given a ptr to the payload
2133 * @param p Pointer to payload area
2134 * @returns Pointer to kuc header
2136 struct kuc_hdr * kuc_ptr(void *p)
2138 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2139 LASSERT(lh->kuc_magic == KUC_MAGIC);
2142 EXPORT_SYMBOL(kuc_ptr);
2144 /* Alloc space for a message, and fill in header
2145 * @return Pointer to payload area
2147 void *kuc_alloc(int payload_len, int transport, int type)
2150 int len = kuc_len(payload_len);
2154 return ERR_PTR(-ENOMEM);
2156 lh->kuc_magic = KUC_MAGIC;
2157 lh->kuc_transport = transport;
2158 lh->kuc_msgtype = type;
2159 lh->kuc_msglen = len;
2161 return (void *)(lh + 1);
2163 EXPORT_SYMBOL(kuc_alloc);
2165 /* Takes pointer to payload area */
2166 void kuc_free(void *p, int payload_len)
2168 struct kuc_hdr *lh = kuc_ptr(p);
2169 OBD_FREE(lh, kuc_len(payload_len));
2171 EXPORT_SYMBOL(kuc_free);
2173 struct obd_request_slot_waiter {
2174 struct list_head orsw_entry;
2175 wait_queue_head_t orsw_waitq;
2179 static bool obd_request_slot_avail(struct client_obd *cli,
2180 struct obd_request_slot_waiter *orsw)
2184 spin_lock(&cli->cl_loi_list_lock);
2185 avail = !!list_empty(&orsw->orsw_entry);
2186 spin_unlock(&cli->cl_loi_list_lock);
2192 * For network flow control, the RPC sponsor needs to acquire a credit
2193 * before sending the RPC. The credits count for a connection is defined
2194 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2195 * the subsequent RPC sponsors need to wait until others released their
2196 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2198 int obd_get_request_slot(struct client_obd *cli)
2200 struct obd_request_slot_waiter orsw;
2201 struct l_wait_info lwi;
2204 spin_lock(&cli->cl_loi_list_lock);
2205 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2206 cli->cl_rpcs_in_flight++;
2207 spin_unlock(&cli->cl_loi_list_lock);
2211 init_waitqueue_head(&orsw.orsw_waitq);
2212 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2213 orsw.orsw_signaled = false;
2214 spin_unlock(&cli->cl_loi_list_lock);
2216 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2217 rc = l_wait_event(orsw.orsw_waitq,
2218 obd_request_slot_avail(cli, &orsw) ||
2222 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2223 * freed but other (such as obd_put_request_slot) is using it. */
2224 spin_lock(&cli->cl_loi_list_lock);
2226 if (!orsw.orsw_signaled) {
2227 if (list_empty(&orsw.orsw_entry))
2228 cli->cl_rpcs_in_flight--;
2230 list_del(&orsw.orsw_entry);
2234 if (orsw.orsw_signaled) {
2235 LASSERT(list_empty(&orsw.orsw_entry));
2239 spin_unlock(&cli->cl_loi_list_lock);
2243 EXPORT_SYMBOL(obd_get_request_slot);
2245 void obd_put_request_slot(struct client_obd *cli)
2247 struct obd_request_slot_waiter *orsw;
2249 spin_lock(&cli->cl_loi_list_lock);
2250 cli->cl_rpcs_in_flight--;
2252 /* If there is free slot, wakeup the first waiter. */
2253 if (!list_empty(&cli->cl_flight_waiters) &&
2254 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2255 orsw = list_entry(cli->cl_flight_waiters.next,
2256 struct obd_request_slot_waiter, orsw_entry);
2257 list_del_init(&orsw->orsw_entry);
2258 cli->cl_rpcs_in_flight++;
2259 wake_up(&orsw->orsw_waitq);
2261 spin_unlock(&cli->cl_loi_list_lock);
2263 EXPORT_SYMBOL(obd_put_request_slot);
2265 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2267 return cli->cl_max_rpcs_in_flight;
2269 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2271 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2273 struct obd_request_slot_waiter *orsw;
2280 if (max > OBD_MAX_RIF_MAX || max < 1)
2283 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2284 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2285 /* adjust max_mod_rpcs_in_flight to ensure it is always
2286 * strictly lower that max_rpcs_in_flight */
2288 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2289 "because it must be higher than "
2290 "max_mod_rpcs_in_flight value",
2291 cli->cl_import->imp_obd->obd_name);
2294 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2295 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2301 spin_lock(&cli->cl_loi_list_lock);
2302 old = cli->cl_max_rpcs_in_flight;
2303 cli->cl_max_rpcs_in_flight = max;
2304 client_adjust_max_dirty(cli);
2308 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2309 for (i = 0; i < diff; i++) {
2310 if (list_empty(&cli->cl_flight_waiters))
2313 orsw = list_entry(cli->cl_flight_waiters.next,
2314 struct obd_request_slot_waiter, orsw_entry);
2315 list_del_init(&orsw->orsw_entry);
2316 cli->cl_rpcs_in_flight++;
2317 wake_up(&orsw->orsw_waitq);
2319 spin_unlock(&cli->cl_loi_list_lock);
2323 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2325 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2327 return cli->cl_max_mod_rpcs_in_flight;
2329 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2331 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2333 struct obd_connect_data *ocd;
2337 if (max > OBD_MAX_RIF_MAX || max < 1)
2340 /* cannot exceed or equal max_rpcs_in_flight */
2341 if (max >= cli->cl_max_rpcs_in_flight) {
2342 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2343 "higher or equal to max_rpcs_in_flight value (%u)\n",
2344 cli->cl_import->imp_obd->obd_name,
2345 max, cli->cl_max_rpcs_in_flight);
2349 /* cannot exceed max modify RPCs in flight supported by the server */
2350 ocd = &cli->cl_import->imp_connect_data;
2351 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2352 maxmodrpcs = ocd->ocd_maxmodrpcs;
2355 if (max > maxmodrpcs) {
2356 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2357 "higher than max_mod_rpcs_per_client value (%hu) "
2358 "returned by the server at connection\n",
2359 cli->cl_import->imp_obd->obd_name,
2364 spin_lock(&cli->cl_mod_rpcs_lock);
2366 prev = cli->cl_max_mod_rpcs_in_flight;
2367 cli->cl_max_mod_rpcs_in_flight = max;
2369 /* wakeup waiters if limit has been increased */
2370 if (cli->cl_max_mod_rpcs_in_flight > prev)
2371 wake_up(&cli->cl_mod_rpcs_waitq);
2373 spin_unlock(&cli->cl_mod_rpcs_lock);
2377 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2380 #define pct(a, b) (b ? a * 100 / b : 0)
2381 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2382 struct seq_file *seq)
2384 unsigned long mod_tot = 0, mod_cum;
2385 struct timespec64 now;
2388 ktime_get_real_ts64(&now);
2390 spin_lock(&cli->cl_mod_rpcs_lock);
2392 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2393 (s64)now.tv_sec, now.tv_nsec);
2394 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2395 cli->cl_mod_rpcs_in_flight);
2397 seq_printf(seq, "\n\t\t\tmodify\n");
2398 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2400 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2403 for (i = 0; i < OBD_HIST_MAX; i++) {
2404 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2406 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2407 i, mod, pct(mod, mod_tot),
2408 pct(mod_cum, mod_tot));
2409 if (mod_cum == mod_tot)
2413 spin_unlock(&cli->cl_mod_rpcs_lock);
2417 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2421 /* The number of modify RPCs sent in parallel is limited
2422 * because the server has a finite number of slots per client to
2423 * store request result and ensure reply reconstruction when needed.
2424 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2425 * that takes into account server limit and cl_max_rpcs_in_flight
2427 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2428 * one close request is allowed above the maximum.
2430 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2435 /* A slot is available if
2436 * - number of modify RPCs in flight is less than the max
2437 * - it's a close RPC and no other close request is in flight
2439 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2440 (close_req && cli->cl_close_rpcs_in_flight == 0);
2445 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2450 spin_lock(&cli->cl_mod_rpcs_lock);
2451 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2452 spin_unlock(&cli->cl_mod_rpcs_lock);
2456 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2459 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2460 it->it_op == IT_READDIR ||
2461 (it->it_op == IT_LAYOUT && !(it->it_flags & FMODE_WRITE))))
2466 /* Get a modify RPC slot from the obd client @cli according
2467 * to the kind of operation @opc that is going to be sent
2468 * and the intent @it of the operation if it applies.
2469 * If the maximum number of modify RPCs in flight is reached
2470 * the thread is put to sleep.
2471 * Returns the tag to be set in the request message. Tag 0
2472 * is reserved for non-modifying requests.
2474 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2475 struct lookup_intent *it)
2477 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2478 bool close_req = false;
2481 /* read-only metadata RPCs don't consume a slot on MDT
2482 * for reply reconstruction
2484 if (obd_skip_mod_rpc_slot(it))
2487 if (opc == MDS_CLOSE)
2491 spin_lock(&cli->cl_mod_rpcs_lock);
2492 max = cli->cl_max_mod_rpcs_in_flight;
2493 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2494 /* there is a slot available */
2495 cli->cl_mod_rpcs_in_flight++;
2497 cli->cl_close_rpcs_in_flight++;
2498 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2499 cli->cl_mod_rpcs_in_flight);
2500 /* find a free tag */
2501 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2503 LASSERT(i < OBD_MAX_RIF_MAX);
2504 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2505 spin_unlock(&cli->cl_mod_rpcs_lock);
2506 /* tag 0 is reserved for non-modify RPCs */
2509 spin_unlock(&cli->cl_mod_rpcs_lock);
2511 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2512 "opc %u, max %hu\n",
2513 cli->cl_import->imp_obd->obd_name, opc, max);
2515 l_wait_event(cli->cl_mod_rpcs_waitq,
2516 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2519 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2521 /* Put a modify RPC slot from the obd client @cli according
2522 * to the kind of operation @opc that has been sent and the
2523 * intent @it of the operation if it applies.
2525 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2526 struct lookup_intent *it, __u16 tag)
2528 bool close_req = false;
2530 if (obd_skip_mod_rpc_slot(it))
2533 if (opc == MDS_CLOSE)
2536 spin_lock(&cli->cl_mod_rpcs_lock);
2537 cli->cl_mod_rpcs_in_flight--;
2539 cli->cl_close_rpcs_in_flight--;
2540 /* release the tag in the bitmap */
2541 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2542 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2543 spin_unlock(&cli->cl_mod_rpcs_lock);
2544 wake_up(&cli->cl_mod_rpcs_waitq);
2546 EXPORT_SYMBOL(obd_put_mod_rpc_slot);