4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
48 static DEFINE_SPINLOCK(obd_types_lock);
49 static LIST_HEAD(obd_types);
50 DEFINE_RWLOCK(obd_dev_lock);
51 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
53 static struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 static struct kmem_cache *import_cachep;
58 static LIST_HEAD(obd_zombie_imports);
59 static LIST_HEAD(obd_zombie_exports);
60 static DEFINE_SPINLOCK(obd_zombie_impexp_lock);
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp,
66 const char *status, int locks, int debug_level);
68 static LIST_HEAD(obd_stale_exports);
69 static DEFINE_SPINLOCK(obd_stale_export_lock);
70 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
72 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
73 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
76 * support functions: we could use inter-module communication, but this
77 * is more portable to other OS's
79 static struct obd_device *obd_device_alloc(void)
81 struct obd_device *obd;
83 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
85 obd->obd_magic = OBD_DEVICE_MAGIC;
90 static void obd_device_free(struct obd_device *obd)
93 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
94 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
95 if (obd->obd_namespace != NULL) {
96 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
97 obd, obd->obd_namespace, obd->obd_force);
100 lu_ref_fini(&obd->obd_reference);
101 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
104 struct obd_type *class_search_type(const char *name)
106 struct list_head *tmp;
107 struct obd_type *type;
109 spin_lock(&obd_types_lock);
110 list_for_each(tmp, &obd_types) {
111 type = list_entry(tmp, struct obd_type, typ_chain);
112 if (strcmp(type->typ_name, name) == 0) {
113 spin_unlock(&obd_types_lock);
117 spin_unlock(&obd_types_lock);
120 EXPORT_SYMBOL(class_search_type);
122 struct obd_type *class_get_type(const char *name)
124 struct obd_type *type = class_search_type(name);
126 #ifdef HAVE_MODULE_LOADING_SUPPORT
128 const char *modname = name;
130 if (strcmp(modname, "obdfilter") == 0)
133 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
134 modname = LUSTRE_OSP_NAME;
136 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
137 modname = LUSTRE_MDT_NAME;
139 if (!request_module("%s", modname)) {
140 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
141 type = class_search_type(name);
143 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
149 spin_lock(&type->obd_type_lock);
151 try_module_get(type->typ_dt_ops->o_owner);
152 spin_unlock(&type->obd_type_lock);
157 void class_put_type(struct obd_type *type)
160 spin_lock(&type->obd_type_lock);
162 module_put(type->typ_dt_ops->o_owner);
163 spin_unlock(&type->obd_type_lock);
166 #define CLASS_MAX_NAME 1024
168 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
169 bool enable_proc, struct lprocfs_vars *vars,
170 const char *name, struct lu_device_type *ldt)
172 struct obd_type *type;
177 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
179 if (class_search_type(name)) {
180 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
185 OBD_ALLOC(type, sizeof(*type));
189 OBD_ALLOC_PTR(type->typ_dt_ops);
190 OBD_ALLOC_PTR(type->typ_md_ops);
191 OBD_ALLOC(type->typ_name, strlen(name) + 1);
193 if (type->typ_dt_ops == NULL ||
194 type->typ_md_ops == NULL ||
195 type->typ_name == NULL)
198 *(type->typ_dt_ops) = *dt_ops;
199 /* md_ops is optional */
201 *(type->typ_md_ops) = *md_ops;
202 strcpy(type->typ_name, name);
203 spin_lock_init(&type->obd_type_lock);
205 #ifdef CONFIG_PROC_FS
207 type->typ_procroot = lprocfs_register(type->typ_name,
210 if (IS_ERR(type->typ_procroot)) {
211 rc = PTR_ERR(type->typ_procroot);
212 type->typ_procroot = NULL;
217 type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
218 if (!type->typ_kobj) {
225 rc = lu_device_type_init(ldt);
230 spin_lock(&obd_types_lock);
231 list_add(&type->typ_chain, &obd_types);
232 spin_unlock(&obd_types_lock);
238 kobject_put(type->typ_kobj);
239 if (type->typ_name != NULL) {
240 #ifdef CONFIG_PROC_FS
241 if (type->typ_procroot != NULL)
242 remove_proc_subtree(type->typ_name, proc_lustre_root);
244 OBD_FREE(type->typ_name, strlen(name) + 1);
246 if (type->typ_md_ops != NULL)
247 OBD_FREE_PTR(type->typ_md_ops);
248 if (type->typ_dt_ops != NULL)
249 OBD_FREE_PTR(type->typ_dt_ops);
250 OBD_FREE(type, sizeof(*type));
253 EXPORT_SYMBOL(class_register_type);
255 int class_unregister_type(const char *name)
257 struct obd_type *type = class_search_type(name);
261 CERROR("unknown obd type\n");
265 if (type->typ_refcnt) {
266 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
267 /* This is a bad situation, let's make the best of it */
268 /* Remove ops, but leave the name for debugging */
269 OBD_FREE_PTR(type->typ_dt_ops);
270 OBD_FREE_PTR(type->typ_md_ops);
275 kobject_put(type->typ_kobj);
277 /* we do not use type->typ_procroot as for compatibility purposes
278 * other modules can share names (i.e. lod can use lov entry). so
279 * we can't reference pointer as it can get invalided when another
280 * module removes the entry */
281 #ifdef CONFIG_PROC_FS
282 if (type->typ_procroot != NULL)
283 remove_proc_subtree(type->typ_name, proc_lustre_root);
284 if (type->typ_procsym != NULL)
285 lprocfs_remove(&type->typ_procsym);
288 lu_device_type_fini(type->typ_lu);
290 spin_lock(&obd_types_lock);
291 list_del(&type->typ_chain);
292 spin_unlock(&obd_types_lock);
293 OBD_FREE(type->typ_name, strlen(name) + 1);
294 if (type->typ_dt_ops != NULL)
295 OBD_FREE_PTR(type->typ_dt_ops);
296 if (type->typ_md_ops != NULL)
297 OBD_FREE_PTR(type->typ_md_ops);
298 OBD_FREE(type, sizeof(*type));
300 } /* class_unregister_type */
301 EXPORT_SYMBOL(class_unregister_type);
304 * Create a new obd device.
306 * Allocate the new obd_device and initialize it.
308 * \param[in] type_name obd device type string.
309 * \param[in] name obd device name.
310 * \param[in] uuid obd device UUID
312 * \retval newdev pointer to created obd_device
313 * \retval ERR_PTR(errno) on error
315 struct obd_device *class_newdev(const char *type_name, const char *name,
318 struct obd_device *newdev;
319 struct obd_type *type = NULL;
322 if (strlen(name) >= MAX_OBD_NAME) {
323 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
324 RETURN(ERR_PTR(-EINVAL));
327 type = class_get_type(type_name);
329 CERROR("OBD: unknown type: %s\n", type_name);
330 RETURN(ERR_PTR(-ENODEV));
333 newdev = obd_device_alloc();
334 if (newdev == NULL) {
335 class_put_type(type);
336 RETURN(ERR_PTR(-ENOMEM));
338 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
339 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
340 newdev->obd_type = type;
341 newdev->obd_minor = -1;
343 rwlock_init(&newdev->obd_pool_lock);
344 newdev->obd_pool_limit = 0;
345 newdev->obd_pool_slv = 0;
347 INIT_LIST_HEAD(&newdev->obd_exports);
348 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
349 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
350 INIT_LIST_HEAD(&newdev->obd_exports_timed);
351 INIT_LIST_HEAD(&newdev->obd_nid_stats);
352 spin_lock_init(&newdev->obd_nid_lock);
353 spin_lock_init(&newdev->obd_dev_lock);
354 mutex_init(&newdev->obd_dev_mutex);
355 spin_lock_init(&newdev->obd_osfs_lock);
356 /* newdev->obd_osfs_age must be set to a value in the distant
357 * past to guarantee a fresh statfs is fetched on mount. */
358 newdev->obd_osfs_age = cfs_time_shift_64(-1000);
360 /* XXX belongs in setup not attach */
361 init_rwsem(&newdev->obd_observer_link_sem);
363 init_timer(&newdev->obd_recovery_timer);
364 spin_lock_init(&newdev->obd_recovery_task_lock);
365 init_waitqueue_head(&newdev->obd_next_transno_waitq);
366 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
367 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
368 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
369 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
370 INIT_LIST_HEAD(&newdev->obd_evict_list);
371 INIT_LIST_HEAD(&newdev->obd_lwp_list);
373 llog_group_init(&newdev->obd_olg);
374 /* Detach drops this */
375 atomic_set(&newdev->obd_refcount, 1);
376 lu_ref_init(&newdev->obd_reference);
377 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
379 newdev->obd_conn_inprogress = 0;
381 strncpy(newdev->obd_uuid.uuid, uuid, strlen(uuid));
383 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
384 newdev->obd_name, newdev);
392 * \param[in] obd obd_device to be freed
396 void class_free_dev(struct obd_device *obd)
398 struct obd_type *obd_type = obd->obd_type;
400 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
401 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
402 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
403 "obd %p != obd_devs[%d] %p\n",
404 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
405 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
406 "obd_refcount should be 0, not %d\n",
407 atomic_read(&obd->obd_refcount));
408 LASSERT(obd_type != NULL);
410 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
411 obd->obd_name, obd->obd_type->typ_name);
413 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
414 obd->obd_name, obd->obd_uuid.uuid);
415 if (obd->obd_stopping) {
418 /* If we're not stopping, we were never set up */
419 err = obd_cleanup(obd);
421 CERROR("Cleanup %s returned %d\n",
425 obd_device_free(obd);
427 class_put_type(obd_type);
431 * Unregister obd device.
433 * Free slot in obd_dev[] used by \a obd.
435 * \param[in] new_obd obd_device to be unregistered
439 void class_unregister_device(struct obd_device *obd)
441 write_lock(&obd_dev_lock);
442 if (obd->obd_minor >= 0) {
443 LASSERT(obd_devs[obd->obd_minor] == obd);
444 obd_devs[obd->obd_minor] = NULL;
447 write_unlock(&obd_dev_lock);
451 * Register obd device.
453 * Find free slot in obd_devs[], fills it with \a new_obd.
455 * \param[in] new_obd obd_device to be registered
458 * \retval -EEXIST device with this name is registered
459 * \retval -EOVERFLOW obd_devs[] is full
461 int class_register_device(struct obd_device *new_obd)
465 int new_obd_minor = 0;
466 bool minor_assign = false;
468 write_lock(&obd_dev_lock);
469 for (i = 0; i < class_devno_max(); i++) {
470 struct obd_device *obd = class_num2obd(i);
473 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
474 CERROR("%s: already exists, won't add\n",
476 /* in case we found a free slot before duplicate */
477 minor_assign = false;
481 if (!minor_assign && obd == NULL) {
488 new_obd->obd_minor = new_obd_minor;
489 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
490 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
491 obd_devs[new_obd_minor] = new_obd;
495 CERROR("%s: all %u/%u devices used, increase "
496 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
497 i, class_devno_max(), ret);
500 write_unlock(&obd_dev_lock);
505 static int class_name2dev_nolock(const char *name)
512 for (i = 0; i < class_devno_max(); i++) {
513 struct obd_device *obd = class_num2obd(i);
515 if (obd && strcmp(name, obd->obd_name) == 0) {
516 /* Make sure we finished attaching before we give
517 out any references */
518 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
519 if (obd->obd_attached) {
529 int class_name2dev(const char *name)
536 read_lock(&obd_dev_lock);
537 i = class_name2dev_nolock(name);
538 read_unlock(&obd_dev_lock);
542 EXPORT_SYMBOL(class_name2dev);
544 struct obd_device *class_name2obd(const char *name)
546 int dev = class_name2dev(name);
548 if (dev < 0 || dev > class_devno_max())
550 return class_num2obd(dev);
552 EXPORT_SYMBOL(class_name2obd);
554 int class_uuid2dev_nolock(struct obd_uuid *uuid)
558 for (i = 0; i < class_devno_max(); i++) {
559 struct obd_device *obd = class_num2obd(i);
561 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
562 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
570 int class_uuid2dev(struct obd_uuid *uuid)
574 read_lock(&obd_dev_lock);
575 i = class_uuid2dev_nolock(uuid);
576 read_unlock(&obd_dev_lock);
580 EXPORT_SYMBOL(class_uuid2dev);
582 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
584 int dev = class_uuid2dev(uuid);
587 return class_num2obd(dev);
589 EXPORT_SYMBOL(class_uuid2obd);
592 * Get obd device from ::obd_devs[]
594 * \param num [in] array index
596 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
597 * otherwise return the obd device there.
599 struct obd_device *class_num2obd(int num)
601 struct obd_device *obd = NULL;
603 if (num < class_devno_max()) {
608 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
609 "%p obd_magic %08x != %08x\n",
610 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
611 LASSERTF(obd->obd_minor == num,
612 "%p obd_minor %0d != %0d\n",
613 obd, obd->obd_minor, num);
620 * Find obd in obd_dev[] by name or uuid.
622 * Increment obd's refcount if found.
624 * \param[in] str obd name or uuid
626 * \retval NULL if not found
627 * \retval target pointer to found obd_device
629 struct obd_device *class_dev_by_str(const char *str)
631 struct obd_device *target = NULL;
632 struct obd_uuid tgtuuid;
635 obd_str2uuid(&tgtuuid, str);
637 read_lock(&obd_dev_lock);
638 rc = class_uuid2dev_nolock(&tgtuuid);
640 rc = class_name2dev_nolock(str);
643 target = class_num2obd(rc);
646 class_incref(target, "find", current);
647 read_unlock(&obd_dev_lock);
651 EXPORT_SYMBOL(class_dev_by_str);
654 * Get obd devices count. Device in any
656 * \retval obd device count
658 int get_devices_count(void)
660 int index, max_index = class_devno_max(), dev_count = 0;
662 read_lock(&obd_dev_lock);
663 for (index = 0; index <= max_index; index++) {
664 struct obd_device *obd = class_num2obd(index);
668 read_unlock(&obd_dev_lock);
672 EXPORT_SYMBOL(get_devices_count);
674 void class_obd_list(void)
679 read_lock(&obd_dev_lock);
680 for (i = 0; i < class_devno_max(); i++) {
681 struct obd_device *obd = class_num2obd(i);
685 if (obd->obd_stopping)
687 else if (obd->obd_set_up)
689 else if (obd->obd_attached)
693 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
694 i, status, obd->obd_type->typ_name,
695 obd->obd_name, obd->obd_uuid.uuid,
696 atomic_read(&obd->obd_refcount));
698 read_unlock(&obd_dev_lock);
702 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
703 specified, then only the client with that uuid is returned,
704 otherwise any client connected to the tgt is returned. */
705 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
706 const char * typ_name,
707 struct obd_uuid *grp_uuid)
711 read_lock(&obd_dev_lock);
712 for (i = 0; i < class_devno_max(); i++) {
713 struct obd_device *obd = class_num2obd(i);
717 if ((strncmp(obd->obd_type->typ_name, typ_name,
718 strlen(typ_name)) == 0)) {
719 if (obd_uuid_equals(tgt_uuid,
720 &obd->u.cli.cl_target_uuid) &&
721 ((grp_uuid)? obd_uuid_equals(grp_uuid,
722 &obd->obd_uuid) : 1)) {
723 read_unlock(&obd_dev_lock);
728 read_unlock(&obd_dev_lock);
732 EXPORT_SYMBOL(class_find_client_obd);
734 /* Iterate the obd_device list looking devices have grp_uuid. Start
735 searching at *next, and if a device is found, the next index to look
736 at is saved in *next. If next is NULL, then the first matching device
737 will always be returned. */
738 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
744 else if (*next >= 0 && *next < class_devno_max())
749 read_lock(&obd_dev_lock);
750 for (; i < class_devno_max(); i++) {
751 struct obd_device *obd = class_num2obd(i);
755 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
758 read_unlock(&obd_dev_lock);
762 read_unlock(&obd_dev_lock);
766 EXPORT_SYMBOL(class_devices_in_group);
769 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
770 * adjust sptlrpc settings accordingly.
772 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
774 struct obd_device *obd;
778 LASSERT(namelen > 0);
780 read_lock(&obd_dev_lock);
781 for (i = 0; i < class_devno_max(); i++) {
782 obd = class_num2obd(i);
784 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
787 /* only notify mdc, osc, osp, lwp, mdt, ost
788 * because only these have a -sptlrpc llog */
789 type = obd->obd_type->typ_name;
790 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
791 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
792 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
793 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
794 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
795 strcmp(type, LUSTRE_OST_NAME) != 0)
798 if (strncmp(obd->obd_name, fsname, namelen))
801 class_incref(obd, __FUNCTION__, obd);
802 read_unlock(&obd_dev_lock);
803 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
804 sizeof(KEY_SPTLRPC_CONF),
805 KEY_SPTLRPC_CONF, 0, NULL, NULL);
807 class_decref(obd, __FUNCTION__, obd);
808 read_lock(&obd_dev_lock);
810 read_unlock(&obd_dev_lock);
813 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
815 void obd_cleanup_caches(void)
818 if (obd_device_cachep) {
819 kmem_cache_destroy(obd_device_cachep);
820 obd_device_cachep = NULL;
823 kmem_cache_destroy(obdo_cachep);
827 kmem_cache_destroy(import_cachep);
828 import_cachep = NULL;
834 int obd_init_caches(void)
839 LASSERT(obd_device_cachep == NULL);
840 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
841 sizeof(struct obd_device),
843 if (!obd_device_cachep)
844 GOTO(out, rc = -ENOMEM);
846 LASSERT(obdo_cachep == NULL);
847 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
850 GOTO(out, rc = -ENOMEM);
852 LASSERT(import_cachep == NULL);
853 import_cachep = kmem_cache_create("ll_import_cache",
854 sizeof(struct obd_import),
857 GOTO(out, rc = -ENOMEM);
861 obd_cleanup_caches();
865 /* map connection to client */
866 struct obd_export *class_conn2export(struct lustre_handle *conn)
868 struct obd_export *export;
872 CDEBUG(D_CACHE, "looking for null handle\n");
876 if (conn->cookie == -1) { /* this means assign a new connection */
877 CDEBUG(D_CACHE, "want a new connection\n");
881 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
882 export = class_handle2object(conn->cookie, NULL);
885 EXPORT_SYMBOL(class_conn2export);
887 struct obd_device *class_exp2obd(struct obd_export *exp)
893 EXPORT_SYMBOL(class_exp2obd);
895 struct obd_device *class_conn2obd(struct lustre_handle *conn)
897 struct obd_export *export;
898 export = class_conn2export(conn);
900 struct obd_device *obd = export->exp_obd;
901 class_export_put(export);
907 struct obd_import *class_exp2cliimp(struct obd_export *exp)
909 struct obd_device *obd = exp->exp_obd;
912 return obd->u.cli.cl_import;
914 EXPORT_SYMBOL(class_exp2cliimp);
916 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
918 struct obd_device *obd = class_conn2obd(conn);
921 return obd->u.cli.cl_import;
924 /* Export management functions */
925 static void class_export_destroy(struct obd_export *exp)
927 struct obd_device *obd = exp->exp_obd;
930 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
931 LASSERT(obd != NULL);
933 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
934 exp->exp_client_uuid.uuid, obd->obd_name);
936 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
937 if (exp->exp_connection)
938 ptlrpc_put_connection_superhack(exp->exp_connection);
940 LASSERT(list_empty(&exp->exp_outstanding_replies));
941 LASSERT(list_empty(&exp->exp_uncommitted_replies));
942 LASSERT(list_empty(&exp->exp_req_replay_queue));
943 LASSERT(list_empty(&exp->exp_hp_rpcs));
944 obd_destroy_export(exp);
945 /* self export doesn't hold a reference to an obd, although it
946 * exists until freeing of the obd */
947 if (exp != obd->obd_self_export)
948 class_decref(obd, "export", exp);
950 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
954 static void export_handle_addref(void *export)
956 class_export_get(export);
959 static struct portals_handle_ops export_handle_ops = {
960 .hop_addref = export_handle_addref,
964 struct obd_export *class_export_get(struct obd_export *exp)
966 atomic_inc(&exp->exp_refcount);
967 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
968 atomic_read(&exp->exp_refcount));
971 EXPORT_SYMBOL(class_export_get);
973 void class_export_put(struct obd_export *exp)
975 LASSERT(exp != NULL);
976 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
977 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
978 atomic_read(&exp->exp_refcount) - 1);
980 if (atomic_dec_and_test(&exp->exp_refcount)) {
981 struct obd_device *obd = exp->exp_obd;
983 CDEBUG(D_IOCTL, "final put %p/%s\n",
984 exp, exp->exp_client_uuid.uuid);
986 /* release nid stat refererence */
987 lprocfs_exp_cleanup(exp);
989 if (exp == obd->obd_self_export) {
990 /* self export should be destroyed without
991 * zombie thread as it doesn't hold a
992 * reference to obd and doesn't hold any
994 class_export_destroy(exp);
995 /* self export is destroyed, no class
996 * references exist and it is safe to free
1000 LASSERT(!list_empty(&exp->exp_obd_chain));
1001 obd_zombie_export_add(exp);
1006 EXPORT_SYMBOL(class_export_put);
1007 /* Creates a new export, adds it to the hash table, and returns a
1008 * pointer to it. The refcount is 2: one for the hash reference, and
1009 * one for the pointer returned by this function. */
1010 struct obd_export *__class_new_export(struct obd_device *obd,
1011 struct obd_uuid *cluuid, bool is_self)
1013 struct obd_export *export;
1014 struct cfs_hash *hash = NULL;
1018 OBD_ALLOC_PTR(export);
1020 return ERR_PTR(-ENOMEM);
1022 export->exp_conn_cnt = 0;
1023 export->exp_lock_hash = NULL;
1024 export->exp_flock_hash = NULL;
1025 /* 2 = class_handle_hash + last */
1026 atomic_set(&export->exp_refcount, 2);
1027 atomic_set(&export->exp_rpc_count, 0);
1028 atomic_set(&export->exp_cb_count, 0);
1029 atomic_set(&export->exp_locks_count, 0);
1030 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1031 INIT_LIST_HEAD(&export->exp_locks_list);
1032 spin_lock_init(&export->exp_locks_list_guard);
1034 atomic_set(&export->exp_replay_count, 0);
1035 export->exp_obd = obd;
1036 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1037 spin_lock_init(&export->exp_uncommitted_replies_lock);
1038 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1039 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1040 INIT_LIST_HEAD(&export->exp_handle.h_link);
1041 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1042 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1043 class_handle_hash(&export->exp_handle, &export_handle_ops);
1044 export->exp_last_request_time = ktime_get_real_seconds();
1045 spin_lock_init(&export->exp_lock);
1046 spin_lock_init(&export->exp_rpc_lock);
1047 INIT_HLIST_NODE(&export->exp_uuid_hash);
1048 INIT_HLIST_NODE(&export->exp_nid_hash);
1049 INIT_HLIST_NODE(&export->exp_gen_hash);
1050 spin_lock_init(&export->exp_bl_list_lock);
1051 INIT_LIST_HEAD(&export->exp_bl_list);
1052 INIT_LIST_HEAD(&export->exp_stale_list);
1054 export->exp_sp_peer = LUSTRE_SP_ANY;
1055 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1056 export->exp_client_uuid = *cluuid;
1057 obd_init_export(export);
1059 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1060 spin_lock(&obd->obd_dev_lock);
1061 /* shouldn't happen, but might race */
1062 if (obd->obd_stopping)
1063 GOTO(exit_unlock, rc = -ENODEV);
1065 hash = cfs_hash_getref(obd->obd_uuid_hash);
1067 GOTO(exit_unlock, rc = -ENODEV);
1068 spin_unlock(&obd->obd_dev_lock);
1070 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
1072 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
1073 obd->obd_name, cluuid->uuid, rc);
1074 GOTO(exit_err, rc = -EALREADY);
1078 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1079 spin_lock(&obd->obd_dev_lock);
1080 if (obd->obd_stopping) {
1082 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
1083 GOTO(exit_unlock, rc = -ESHUTDOWN);
1087 class_incref(obd, "export", export);
1088 list_add_tail(&export->exp_obd_chain_timed,
1089 &obd->obd_exports_timed);
1090 list_add(&export->exp_obd_chain, &obd->obd_exports);
1091 obd->obd_num_exports++;
1093 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1094 INIT_LIST_HEAD(&export->exp_obd_chain);
1096 spin_unlock(&obd->obd_dev_lock);
1098 cfs_hash_putref(hash);
1102 spin_unlock(&obd->obd_dev_lock);
1105 cfs_hash_putref(hash);
1106 class_handle_unhash(&export->exp_handle);
1107 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
1108 obd_destroy_export(export);
1109 OBD_FREE_PTR(export);
1113 struct obd_export *class_new_export(struct obd_device *obd,
1114 struct obd_uuid *uuid)
1116 return __class_new_export(obd, uuid, false);
1118 EXPORT_SYMBOL(class_new_export);
1120 struct obd_export *class_new_export_self(struct obd_device *obd,
1121 struct obd_uuid *uuid)
1123 return __class_new_export(obd, uuid, true);
1126 void class_unlink_export(struct obd_export *exp)
1128 class_handle_unhash(&exp->exp_handle);
1130 if (exp->exp_obd->obd_self_export == exp) {
1131 class_export_put(exp);
1135 spin_lock(&exp->exp_obd->obd_dev_lock);
1136 /* delete an uuid-export hashitem from hashtables */
1137 if (!hlist_unhashed(&exp->exp_uuid_hash))
1138 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
1139 &exp->exp_client_uuid,
1140 &exp->exp_uuid_hash);
1142 #ifdef HAVE_SERVER_SUPPORT
1143 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1144 struct tg_export_data *ted = &exp->exp_target_data;
1145 struct cfs_hash *hash;
1147 /* Because obd_gen_hash will not be released until
1148 * class_cleanup(), so hash should never be NULL here */
1149 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1150 LASSERT(hash != NULL);
1151 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1152 &exp->exp_gen_hash);
1153 cfs_hash_putref(hash);
1155 #endif /* HAVE_SERVER_SUPPORT */
1157 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1158 list_del_init(&exp->exp_obd_chain_timed);
1159 exp->exp_obd->obd_num_exports--;
1160 spin_unlock(&exp->exp_obd->obd_dev_lock);
1161 atomic_inc(&obd_stale_export_num);
1163 /* A reference is kept by obd_stale_exports list */
1164 obd_stale_export_put(exp);
1166 EXPORT_SYMBOL(class_unlink_export);
1168 /* Import management functions */
1169 static void class_import_destroy(struct obd_import *imp)
1173 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1174 imp->imp_obd->obd_name);
1176 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
1178 ptlrpc_put_connection_superhack(imp->imp_connection);
1180 while (!list_empty(&imp->imp_conn_list)) {
1181 struct obd_import_conn *imp_conn;
1183 imp_conn = list_entry(imp->imp_conn_list.next,
1184 struct obd_import_conn, oic_item);
1185 list_del_init(&imp_conn->oic_item);
1186 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
1187 OBD_FREE(imp_conn, sizeof(*imp_conn));
1190 LASSERT(imp->imp_sec == NULL);
1191 class_decref(imp->imp_obd, "import", imp);
1192 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1196 static void import_handle_addref(void *import)
1198 class_import_get(import);
1201 static struct portals_handle_ops import_handle_ops = {
1202 .hop_addref = import_handle_addref,
1206 struct obd_import *class_import_get(struct obd_import *import)
1208 atomic_inc(&import->imp_refcount);
1209 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1210 atomic_read(&import->imp_refcount),
1211 import->imp_obd->obd_name);
1214 EXPORT_SYMBOL(class_import_get);
1216 void class_import_put(struct obd_import *imp)
1220 LASSERT(list_empty(&imp->imp_zombie_chain));
1221 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1223 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1224 atomic_read(&imp->imp_refcount) - 1,
1225 imp->imp_obd->obd_name);
1227 if (atomic_dec_and_test(&imp->imp_refcount)) {
1228 CDEBUG(D_INFO, "final put import %p\n", imp);
1229 obd_zombie_import_add(imp);
1232 /* catch possible import put race */
1233 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1236 EXPORT_SYMBOL(class_import_put);
1238 static void init_imp_at(struct imp_at *at) {
1240 at_init(&at->iat_net_latency, 0, 0);
1241 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1242 /* max service estimates are tracked on the server side, so
1243 don't use the AT history here, just use the last reported
1244 val. (But keep hist for proc histogram, worst_ever) */
1245 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1250 struct obd_import *class_new_import(struct obd_device *obd)
1252 struct obd_import *imp;
1253 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1255 OBD_ALLOC(imp, sizeof(*imp));
1259 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1260 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1261 INIT_LIST_HEAD(&imp->imp_replay_list);
1262 INIT_LIST_HEAD(&imp->imp_sending_list);
1263 INIT_LIST_HEAD(&imp->imp_delayed_list);
1264 INIT_LIST_HEAD(&imp->imp_committed_list);
1265 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1266 imp->imp_known_replied_xid = 0;
1267 imp->imp_replay_cursor = &imp->imp_committed_list;
1268 spin_lock_init(&imp->imp_lock);
1269 imp->imp_last_success_conn = 0;
1270 imp->imp_state = LUSTRE_IMP_NEW;
1271 imp->imp_obd = class_incref(obd, "import", imp);
1272 mutex_init(&imp->imp_sec_mutex);
1273 init_waitqueue_head(&imp->imp_recovery_waitq);
1275 if (curr_pid_ns->child_reaper)
1276 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1278 imp->imp_sec_refpid = 1;
1280 atomic_set(&imp->imp_refcount, 2);
1281 atomic_set(&imp->imp_unregistering, 0);
1282 atomic_set(&imp->imp_inflight, 0);
1283 atomic_set(&imp->imp_replay_inflight, 0);
1284 atomic_set(&imp->imp_inval_count, 0);
1285 INIT_LIST_HEAD(&imp->imp_conn_list);
1286 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1287 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1288 init_imp_at(&imp->imp_at);
1290 /* the default magic is V2, will be used in connect RPC, and
1291 * then adjusted according to the flags in request/reply. */
1292 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1296 EXPORT_SYMBOL(class_new_import);
1298 void class_destroy_import(struct obd_import *import)
1300 LASSERT(import != NULL);
1301 LASSERT(import != LP_POISON);
1303 class_handle_unhash(&import->imp_handle);
1305 spin_lock(&import->imp_lock);
1306 import->imp_generation++;
1307 spin_unlock(&import->imp_lock);
1308 class_import_put(import);
1310 EXPORT_SYMBOL(class_destroy_import);
1312 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1314 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1316 spin_lock(&exp->exp_locks_list_guard);
1318 LASSERT(lock->l_exp_refs_nr >= 0);
1320 if (lock->l_exp_refs_target != NULL &&
1321 lock->l_exp_refs_target != exp) {
1322 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1323 exp, lock, lock->l_exp_refs_target);
1325 if ((lock->l_exp_refs_nr ++) == 0) {
1326 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1327 lock->l_exp_refs_target = exp;
1329 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1330 lock, exp, lock->l_exp_refs_nr);
1331 spin_unlock(&exp->exp_locks_list_guard);
1333 EXPORT_SYMBOL(__class_export_add_lock_ref);
1335 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1337 spin_lock(&exp->exp_locks_list_guard);
1338 LASSERT(lock->l_exp_refs_nr > 0);
1339 if (lock->l_exp_refs_target != exp) {
1340 LCONSOLE_WARN("lock %p, "
1341 "mismatching export pointers: %p, %p\n",
1342 lock, lock->l_exp_refs_target, exp);
1344 if (-- lock->l_exp_refs_nr == 0) {
1345 list_del_init(&lock->l_exp_refs_link);
1346 lock->l_exp_refs_target = NULL;
1348 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1349 lock, exp, lock->l_exp_refs_nr);
1350 spin_unlock(&exp->exp_locks_list_guard);
1352 EXPORT_SYMBOL(__class_export_del_lock_ref);
1355 /* A connection defines an export context in which preallocation can
1356 be managed. This releases the export pointer reference, and returns
1357 the export handle, so the export refcount is 1 when this function
1359 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1360 struct obd_uuid *cluuid)
1362 struct obd_export *export;
1363 LASSERT(conn != NULL);
1364 LASSERT(obd != NULL);
1365 LASSERT(cluuid != NULL);
1368 export = class_new_export(obd, cluuid);
1370 RETURN(PTR_ERR(export));
1372 conn->cookie = export->exp_handle.h_cookie;
1373 class_export_put(export);
1375 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1376 cluuid->uuid, conn->cookie);
1379 EXPORT_SYMBOL(class_connect);
1381 /* if export is involved in recovery then clean up related things */
1382 static void class_export_recovery_cleanup(struct obd_export *exp)
1384 struct obd_device *obd = exp->exp_obd;
1386 spin_lock(&obd->obd_recovery_task_lock);
1387 if (obd->obd_recovering) {
1388 if (exp->exp_in_recovery) {
1389 spin_lock(&exp->exp_lock);
1390 exp->exp_in_recovery = 0;
1391 spin_unlock(&exp->exp_lock);
1392 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1393 atomic_dec(&obd->obd_connected_clients);
1396 /* if called during recovery then should update
1397 * obd_stale_clients counter,
1398 * lightweight exports are not counted */
1399 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1400 exp->exp_obd->obd_stale_clients++;
1402 spin_unlock(&obd->obd_recovery_task_lock);
1404 spin_lock(&exp->exp_lock);
1405 /** Cleanup req replay fields */
1406 if (exp->exp_req_replay_needed) {
1407 exp->exp_req_replay_needed = 0;
1409 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1410 atomic_dec(&obd->obd_req_replay_clients);
1413 /** Cleanup lock replay data */
1414 if (exp->exp_lock_replay_needed) {
1415 exp->exp_lock_replay_needed = 0;
1417 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1418 atomic_dec(&obd->obd_lock_replay_clients);
1420 spin_unlock(&exp->exp_lock);
1423 /* This function removes 1-3 references from the export:
1424 * 1 - for export pointer passed
1425 * and if disconnect really need
1426 * 2 - removing from hash
1427 * 3 - in client_unlink_export
1428 * The export pointer passed to this function can destroyed */
1429 int class_disconnect(struct obd_export *export)
1431 int already_disconnected;
1434 if (export == NULL) {
1435 CWARN("attempting to free NULL export %p\n", export);
1439 spin_lock(&export->exp_lock);
1440 already_disconnected = export->exp_disconnected;
1441 export->exp_disconnected = 1;
1442 /* We hold references of export for uuid hash
1443 * and nid_hash and export link at least. So
1444 * it is safe to call cfs_hash_del in there. */
1445 if (!hlist_unhashed(&export->exp_nid_hash))
1446 cfs_hash_del(export->exp_obd->obd_nid_hash,
1447 &export->exp_connection->c_peer.nid,
1448 &export->exp_nid_hash);
1449 spin_unlock(&export->exp_lock);
1451 /* class_cleanup(), abort_recovery(), and class_fail_export()
1452 * all end up in here, and if any of them race we shouldn't
1453 * call extra class_export_puts(). */
1454 if (already_disconnected) {
1455 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1456 GOTO(no_disconn, already_disconnected);
1459 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1460 export->exp_handle.h_cookie);
1462 class_export_recovery_cleanup(export);
1463 class_unlink_export(export);
1465 class_export_put(export);
1468 EXPORT_SYMBOL(class_disconnect);
1470 /* Return non-zero for a fully connected export */
1471 int class_connected_export(struct obd_export *exp)
1476 spin_lock(&exp->exp_lock);
1477 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1478 spin_unlock(&exp->exp_lock);
1482 EXPORT_SYMBOL(class_connected_export);
1484 static void class_disconnect_export_list(struct list_head *list,
1485 enum obd_option flags)
1488 struct obd_export *exp;
1491 /* It's possible that an export may disconnect itself, but
1492 * nothing else will be added to this list. */
1493 while (!list_empty(list)) {
1494 exp = list_entry(list->next, struct obd_export,
1496 /* need for safe call CDEBUG after obd_disconnect */
1497 class_export_get(exp);
1499 spin_lock(&exp->exp_lock);
1500 exp->exp_flags = flags;
1501 spin_unlock(&exp->exp_lock);
1503 if (obd_uuid_equals(&exp->exp_client_uuid,
1504 &exp->exp_obd->obd_uuid)) {
1506 "exp %p export uuid == obd uuid, don't discon\n",
1508 /* Need to delete this now so we don't end up pointing
1509 * to work_list later when this export is cleaned up. */
1510 list_del_init(&exp->exp_obd_chain);
1511 class_export_put(exp);
1515 class_export_get(exp);
1516 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1517 "last request at %lld\n",
1518 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1519 exp, exp->exp_last_request_time);
1520 /* release one export reference anyway */
1521 rc = obd_disconnect(exp);
1523 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1524 obd_export_nid2str(exp), exp, rc);
1525 class_export_put(exp);
1530 void class_disconnect_exports(struct obd_device *obd)
1532 struct list_head work_list;
1535 /* Move all of the exports from obd_exports to a work list, en masse. */
1536 INIT_LIST_HEAD(&work_list);
1537 spin_lock(&obd->obd_dev_lock);
1538 list_splice_init(&obd->obd_exports, &work_list);
1539 list_splice_init(&obd->obd_delayed_exports, &work_list);
1540 spin_unlock(&obd->obd_dev_lock);
1542 if (!list_empty(&work_list)) {
1543 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1544 "disconnecting them\n", obd->obd_minor, obd);
1545 class_disconnect_export_list(&work_list,
1546 exp_flags_from_obd(obd));
1548 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1549 obd->obd_minor, obd);
1552 EXPORT_SYMBOL(class_disconnect_exports);
1554 /* Remove exports that have not completed recovery.
1556 void class_disconnect_stale_exports(struct obd_device *obd,
1557 int (*test_export)(struct obd_export *))
1559 struct list_head work_list;
1560 struct obd_export *exp, *n;
1564 INIT_LIST_HEAD(&work_list);
1565 spin_lock(&obd->obd_dev_lock);
1566 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1568 /* don't count self-export as client */
1569 if (obd_uuid_equals(&exp->exp_client_uuid,
1570 &exp->exp_obd->obd_uuid))
1573 /* don't evict clients which have no slot in last_rcvd
1574 * (e.g. lightweight connection) */
1575 if (exp->exp_target_data.ted_lr_idx == -1)
1578 spin_lock(&exp->exp_lock);
1579 if (exp->exp_failed || test_export(exp)) {
1580 spin_unlock(&exp->exp_lock);
1583 exp->exp_failed = 1;
1584 spin_unlock(&exp->exp_lock);
1586 list_move(&exp->exp_obd_chain, &work_list);
1588 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1589 obd->obd_name, exp->exp_client_uuid.uuid,
1590 exp->exp_connection == NULL ? "<unknown>" :
1591 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1592 print_export_data(exp, "EVICTING", 0, D_HA);
1594 spin_unlock(&obd->obd_dev_lock);
1597 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1598 obd->obd_name, evicted);
1600 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1601 OBD_OPT_ABORT_RECOV);
1604 EXPORT_SYMBOL(class_disconnect_stale_exports);
1606 void class_fail_export(struct obd_export *exp)
1608 int rc, already_failed;
1610 spin_lock(&exp->exp_lock);
1611 already_failed = exp->exp_failed;
1612 exp->exp_failed = 1;
1613 spin_unlock(&exp->exp_lock);
1615 if (already_failed) {
1616 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1617 exp, exp->exp_client_uuid.uuid);
1621 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1622 exp, exp->exp_client_uuid.uuid);
1624 if (obd_dump_on_timeout)
1625 libcfs_debug_dumplog();
1627 /* need for safe call CDEBUG after obd_disconnect */
1628 class_export_get(exp);
1630 /* Most callers into obd_disconnect are removing their own reference
1631 * (request, for example) in addition to the one from the hash table.
1632 * We don't have such a reference here, so make one. */
1633 class_export_get(exp);
1634 rc = obd_disconnect(exp);
1636 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1638 CDEBUG(D_HA, "disconnected export %p/%s\n",
1639 exp, exp->exp_client_uuid.uuid);
1640 class_export_put(exp);
1642 EXPORT_SYMBOL(class_fail_export);
1644 char *obd_export_nid2str(struct obd_export *exp)
1646 if (exp->exp_connection != NULL)
1647 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1651 EXPORT_SYMBOL(obd_export_nid2str);
1653 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1655 struct cfs_hash *nid_hash;
1656 struct obd_export *doomed_exp = NULL;
1657 int exports_evicted = 0;
1659 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1661 spin_lock(&obd->obd_dev_lock);
1662 /* umount has run already, so evict thread should leave
1663 * its task to umount thread now */
1664 if (obd->obd_stopping) {
1665 spin_unlock(&obd->obd_dev_lock);
1666 return exports_evicted;
1668 nid_hash = obd->obd_nid_hash;
1669 cfs_hash_getref(nid_hash);
1670 spin_unlock(&obd->obd_dev_lock);
1673 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1674 if (doomed_exp == NULL)
1677 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1678 "nid %s found, wanted nid %s, requested nid %s\n",
1679 obd_export_nid2str(doomed_exp),
1680 libcfs_nid2str(nid_key), nid);
1681 LASSERTF(doomed_exp != obd->obd_self_export,
1682 "self-export is hashed by NID?\n");
1684 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1685 "request\n", obd->obd_name,
1686 obd_uuid2str(&doomed_exp->exp_client_uuid),
1687 obd_export_nid2str(doomed_exp));
1688 class_fail_export(doomed_exp);
1689 class_export_put(doomed_exp);
1692 cfs_hash_putref(nid_hash);
1694 if (!exports_evicted)
1695 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1696 obd->obd_name, nid);
1697 return exports_evicted;
1699 EXPORT_SYMBOL(obd_export_evict_by_nid);
1701 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1703 struct cfs_hash *uuid_hash;
1704 struct obd_export *doomed_exp = NULL;
1705 struct obd_uuid doomed_uuid;
1706 int exports_evicted = 0;
1708 spin_lock(&obd->obd_dev_lock);
1709 if (obd->obd_stopping) {
1710 spin_unlock(&obd->obd_dev_lock);
1711 return exports_evicted;
1713 uuid_hash = obd->obd_uuid_hash;
1714 cfs_hash_getref(uuid_hash);
1715 spin_unlock(&obd->obd_dev_lock);
1717 obd_str2uuid(&doomed_uuid, uuid);
1718 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1719 CERROR("%s: can't evict myself\n", obd->obd_name);
1720 cfs_hash_putref(uuid_hash);
1721 return exports_evicted;
1724 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1726 if (doomed_exp == NULL) {
1727 CERROR("%s: can't disconnect %s: no exports found\n",
1728 obd->obd_name, uuid);
1730 CWARN("%s: evicting %s at adminstrative request\n",
1731 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1732 class_fail_export(doomed_exp);
1733 class_export_put(doomed_exp);
1736 cfs_hash_putref(uuid_hash);
1738 return exports_evicted;
1741 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1742 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1743 EXPORT_SYMBOL(class_export_dump_hook);
1746 static void print_export_data(struct obd_export *exp, const char *status,
1747 int locks, int debug_level)
1749 struct ptlrpc_reply_state *rs;
1750 struct ptlrpc_reply_state *first_reply = NULL;
1753 spin_lock(&exp->exp_lock);
1754 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1760 spin_unlock(&exp->exp_lock);
1762 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1763 "%p %s %llu stale:%d\n",
1764 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1765 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1766 atomic_read(&exp->exp_rpc_count),
1767 atomic_read(&exp->exp_cb_count),
1768 atomic_read(&exp->exp_locks_count),
1769 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1770 nreplies, first_reply, nreplies > 3 ? "..." : "",
1771 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1772 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1773 if (locks && class_export_dump_hook != NULL)
1774 class_export_dump_hook(exp);
1778 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1780 struct obd_export *exp;
1782 spin_lock(&obd->obd_dev_lock);
1783 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1784 print_export_data(exp, "ACTIVE", locks, debug_level);
1785 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1786 print_export_data(exp, "UNLINKED", locks, debug_level);
1787 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1788 print_export_data(exp, "DELAYED", locks, debug_level);
1789 spin_unlock(&obd->obd_dev_lock);
1790 spin_lock(&obd_zombie_impexp_lock);
1791 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1792 print_export_data(exp, "ZOMBIE", locks, debug_level);
1793 spin_unlock(&obd_zombie_impexp_lock);
1796 void obd_exports_barrier(struct obd_device *obd)
1799 LASSERT(list_empty(&obd->obd_exports));
1800 spin_lock(&obd->obd_dev_lock);
1801 while (!list_empty(&obd->obd_unlinked_exports)) {
1802 spin_unlock(&obd->obd_dev_lock);
1803 set_current_state(TASK_UNINTERRUPTIBLE);
1804 schedule_timeout(cfs_time_seconds(waited));
1805 if (waited > 5 && is_power_of_2(waited)) {
1806 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1807 "more than %d seconds. "
1808 "The obd refcount = %d. Is it stuck?\n",
1809 obd->obd_name, waited,
1810 atomic_read(&obd->obd_refcount));
1811 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1814 spin_lock(&obd->obd_dev_lock);
1816 spin_unlock(&obd->obd_dev_lock);
1818 EXPORT_SYMBOL(obd_exports_barrier);
1820 /* Total amount of zombies to be destroyed */
1821 static int zombies_count = 0;
1824 * kill zombie imports and exports
1826 void obd_zombie_impexp_cull(void)
1828 struct obd_import *import;
1829 struct obd_export *export;
1833 spin_lock(&obd_zombie_impexp_lock);
1836 if (!list_empty(&obd_zombie_imports)) {
1837 import = list_entry(obd_zombie_imports.next,
1840 list_del_init(&import->imp_zombie_chain);
1844 if (!list_empty(&obd_zombie_exports)) {
1845 export = list_entry(obd_zombie_exports.next,
1848 list_del_init(&export->exp_obd_chain);
1851 spin_unlock(&obd_zombie_impexp_lock);
1853 if (import != NULL) {
1854 class_import_destroy(import);
1855 spin_lock(&obd_zombie_impexp_lock);
1857 spin_unlock(&obd_zombie_impexp_lock);
1860 if (export != NULL) {
1861 class_export_destroy(export);
1862 spin_lock(&obd_zombie_impexp_lock);
1864 spin_unlock(&obd_zombie_impexp_lock);
1868 } while (import != NULL || export != NULL);
1872 static DECLARE_COMPLETION(obd_zombie_start);
1873 static DECLARE_COMPLETION(obd_zombie_stop);
1874 static unsigned long obd_zombie_flags;
1875 static DECLARE_WAIT_QUEUE_HEAD(obd_zombie_waitq);
1876 static pid_t obd_zombie_pid;
1879 OBD_ZOMBIE_STOP = 0x0001,
1883 * check for work for kill zombie import/export thread.
1885 static int obd_zombie_impexp_check(void *arg)
1889 spin_lock(&obd_zombie_impexp_lock);
1890 rc = (zombies_count == 0) &&
1891 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1892 spin_unlock(&obd_zombie_impexp_lock);
1898 * Add export to the obd_zombe thread and notify it.
1900 static void obd_zombie_export_add(struct obd_export *exp) {
1901 atomic_dec(&obd_stale_export_num);
1902 spin_lock(&exp->exp_obd->obd_dev_lock);
1903 LASSERT(!list_empty(&exp->exp_obd_chain));
1904 list_del_init(&exp->exp_obd_chain);
1905 spin_unlock(&exp->exp_obd->obd_dev_lock);
1906 spin_lock(&obd_zombie_impexp_lock);
1908 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1909 spin_unlock(&obd_zombie_impexp_lock);
1911 obd_zombie_impexp_notify();
1915 * Add import to the obd_zombe thread and notify it.
1917 static void obd_zombie_import_add(struct obd_import *imp) {
1918 LASSERT(imp->imp_sec == NULL);
1919 spin_lock(&obd_zombie_impexp_lock);
1920 LASSERT(list_empty(&imp->imp_zombie_chain));
1922 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1923 spin_unlock(&obd_zombie_impexp_lock);
1925 obd_zombie_impexp_notify();
1929 * notify import/export destroy thread about new zombie.
1931 static void obd_zombie_impexp_notify(void)
1934 * Make sure obd_zomebie_impexp_thread get this notification.
1935 * It is possible this signal only get by obd_zombie_barrier, and
1936 * barrier gulps this notification and sleeps away and hangs ensues
1938 wake_up_all(&obd_zombie_waitq);
1942 * check whether obd_zombie is idle
1944 static int obd_zombie_is_idle(void)
1948 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1949 spin_lock(&obd_zombie_impexp_lock);
1950 rc = (zombies_count == 0);
1951 spin_unlock(&obd_zombie_impexp_lock);
1956 * wait when obd_zombie import/export queues become empty
1958 void obd_zombie_barrier(void)
1960 struct l_wait_info lwi = { 0 };
1962 if (obd_zombie_pid == current_pid())
1963 /* don't wait for myself */
1965 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1967 EXPORT_SYMBOL(obd_zombie_barrier);
1970 struct obd_export *obd_stale_export_get(void)
1972 struct obd_export *exp = NULL;
1975 spin_lock(&obd_stale_export_lock);
1976 if (!list_empty(&obd_stale_exports)) {
1977 exp = list_entry(obd_stale_exports.next,
1978 struct obd_export, exp_stale_list);
1979 list_del_init(&exp->exp_stale_list);
1981 spin_unlock(&obd_stale_export_lock);
1984 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1985 atomic_read(&obd_stale_export_num));
1989 EXPORT_SYMBOL(obd_stale_export_get);
1991 void obd_stale_export_put(struct obd_export *exp)
1995 LASSERT(list_empty(&exp->exp_stale_list));
1996 if (exp->exp_lock_hash &&
1997 atomic_read(&exp->exp_lock_hash->hs_count)) {
1998 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1999 atomic_read(&obd_stale_export_num));
2001 spin_lock_bh(&exp->exp_bl_list_lock);
2002 spin_lock(&obd_stale_export_lock);
2003 /* Add to the tail if there is no blocked locks,
2004 * to the head otherwise. */
2005 if (list_empty(&exp->exp_bl_list))
2006 list_add_tail(&exp->exp_stale_list,
2007 &obd_stale_exports);
2009 list_add(&exp->exp_stale_list,
2010 &obd_stale_exports);
2012 spin_unlock(&obd_stale_export_lock);
2013 spin_unlock_bh(&exp->exp_bl_list_lock);
2015 class_export_put(exp);
2019 EXPORT_SYMBOL(obd_stale_export_put);
2022 * Adjust the position of the export in the stale list,
2023 * i.e. move to the head of the list if is needed.
2025 void obd_stale_export_adjust(struct obd_export *exp)
2027 LASSERT(exp != NULL);
2028 spin_lock_bh(&exp->exp_bl_list_lock);
2029 spin_lock(&obd_stale_export_lock);
2031 if (!list_empty(&exp->exp_stale_list) &&
2032 !list_empty(&exp->exp_bl_list))
2033 list_move(&exp->exp_stale_list, &obd_stale_exports);
2035 spin_unlock(&obd_stale_export_lock);
2036 spin_unlock_bh(&exp->exp_bl_list_lock);
2038 EXPORT_SYMBOL(obd_stale_export_adjust);
2041 * destroy zombie export/import thread.
2043 static int obd_zombie_impexp_thread(void *unused)
2045 unshare_fs_struct();
2046 complete(&obd_zombie_start);
2048 obd_zombie_pid = current_pid();
2050 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
2051 struct l_wait_info lwi = { 0 };
2053 l_wait_event(obd_zombie_waitq,
2054 !obd_zombie_impexp_check(NULL), &lwi);
2055 obd_zombie_impexp_cull();
2058 * Notify obd_zombie_barrier callers that queues
2061 wake_up(&obd_zombie_waitq);
2064 complete(&obd_zombie_stop);
2071 * start destroy zombie import/export thread
2073 int obd_zombie_impexp_init(void)
2075 struct task_struct *task;
2077 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
2079 RETURN(PTR_ERR(task));
2081 wait_for_completion(&obd_zombie_start);
2085 * stop destroy zombie import/export thread
2087 void obd_zombie_impexp_stop(void)
2089 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
2090 obd_zombie_impexp_notify();
2091 wait_for_completion(&obd_zombie_stop);
2092 LASSERT(list_empty(&obd_stale_exports));
2095 /***** Kernel-userspace comm helpers *******/
2097 /* Get length of entire message, including header */
2098 int kuc_len(int payload_len)
2100 return sizeof(struct kuc_hdr) + payload_len;
2102 EXPORT_SYMBOL(kuc_len);
2104 /* Get a pointer to kuc header, given a ptr to the payload
2105 * @param p Pointer to payload area
2106 * @returns Pointer to kuc header
2108 struct kuc_hdr * kuc_ptr(void *p)
2110 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
2111 LASSERT(lh->kuc_magic == KUC_MAGIC);
2114 EXPORT_SYMBOL(kuc_ptr);
2116 /* Alloc space for a message, and fill in header
2117 * @return Pointer to payload area
2119 void *kuc_alloc(int payload_len, int transport, int type)
2122 int len = kuc_len(payload_len);
2126 return ERR_PTR(-ENOMEM);
2128 lh->kuc_magic = KUC_MAGIC;
2129 lh->kuc_transport = transport;
2130 lh->kuc_msgtype = type;
2131 lh->kuc_msglen = len;
2133 return (void *)(lh + 1);
2135 EXPORT_SYMBOL(kuc_alloc);
2137 /* Takes pointer to payload area */
2138 void kuc_free(void *p, int payload_len)
2140 struct kuc_hdr *lh = kuc_ptr(p);
2141 OBD_FREE(lh, kuc_len(payload_len));
2143 EXPORT_SYMBOL(kuc_free);
2145 struct obd_request_slot_waiter {
2146 struct list_head orsw_entry;
2147 wait_queue_head_t orsw_waitq;
2151 static bool obd_request_slot_avail(struct client_obd *cli,
2152 struct obd_request_slot_waiter *orsw)
2156 spin_lock(&cli->cl_loi_list_lock);
2157 avail = !!list_empty(&orsw->orsw_entry);
2158 spin_unlock(&cli->cl_loi_list_lock);
2164 * For network flow control, the RPC sponsor needs to acquire a credit
2165 * before sending the RPC. The credits count for a connection is defined
2166 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2167 * the subsequent RPC sponsors need to wait until others released their
2168 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2170 int obd_get_request_slot(struct client_obd *cli)
2172 struct obd_request_slot_waiter orsw;
2173 struct l_wait_info lwi;
2176 spin_lock(&cli->cl_loi_list_lock);
2177 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
2178 cli->cl_r_in_flight++;
2179 spin_unlock(&cli->cl_loi_list_lock);
2183 init_waitqueue_head(&orsw.orsw_waitq);
2184 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
2185 orsw.orsw_signaled = false;
2186 spin_unlock(&cli->cl_loi_list_lock);
2188 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2189 rc = l_wait_event(orsw.orsw_waitq,
2190 obd_request_slot_avail(cli, &orsw) ||
2194 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2195 * freed but other (such as obd_put_request_slot) is using it. */
2196 spin_lock(&cli->cl_loi_list_lock);
2198 if (!orsw.orsw_signaled) {
2199 if (list_empty(&orsw.orsw_entry))
2200 cli->cl_r_in_flight--;
2202 list_del(&orsw.orsw_entry);
2206 if (orsw.orsw_signaled) {
2207 LASSERT(list_empty(&orsw.orsw_entry));
2211 spin_unlock(&cli->cl_loi_list_lock);
2215 EXPORT_SYMBOL(obd_get_request_slot);
2217 void obd_put_request_slot(struct client_obd *cli)
2219 struct obd_request_slot_waiter *orsw;
2221 spin_lock(&cli->cl_loi_list_lock);
2222 cli->cl_r_in_flight--;
2224 /* If there is free slot, wakeup the first waiter. */
2225 if (!list_empty(&cli->cl_loi_read_list) &&
2226 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2227 orsw = list_entry(cli->cl_loi_read_list.next,
2228 struct obd_request_slot_waiter, orsw_entry);
2229 list_del_init(&orsw->orsw_entry);
2230 cli->cl_r_in_flight++;
2231 wake_up(&orsw->orsw_waitq);
2233 spin_unlock(&cli->cl_loi_list_lock);
2235 EXPORT_SYMBOL(obd_put_request_slot);
2237 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2239 return cli->cl_max_rpcs_in_flight;
2241 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2243 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2245 struct obd_request_slot_waiter *orsw;
2252 if (max > OBD_MAX_RIF_MAX || max < 1)
2255 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2256 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2257 /* adjust max_mod_rpcs_in_flight to ensure it is always
2258 * strictly lower that max_rpcs_in_flight */
2260 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2261 "because it must be higher than "
2262 "max_mod_rpcs_in_flight value",
2263 cli->cl_import->imp_obd->obd_name);
2266 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2267 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2273 spin_lock(&cli->cl_loi_list_lock);
2274 old = cli->cl_max_rpcs_in_flight;
2275 cli->cl_max_rpcs_in_flight = max;
2278 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2279 for (i = 0; i < diff; i++) {
2280 if (list_empty(&cli->cl_loi_read_list))
2283 orsw = list_entry(cli->cl_loi_read_list.next,
2284 struct obd_request_slot_waiter, orsw_entry);
2285 list_del_init(&orsw->orsw_entry);
2286 cli->cl_r_in_flight++;
2287 wake_up(&orsw->orsw_waitq);
2289 spin_unlock(&cli->cl_loi_list_lock);
2293 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2295 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2297 return cli->cl_max_mod_rpcs_in_flight;
2299 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2301 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2303 struct obd_connect_data *ocd;
2307 if (max > OBD_MAX_RIF_MAX || max < 1)
2310 /* cannot exceed or equal max_rpcs_in_flight */
2311 if (max >= cli->cl_max_rpcs_in_flight) {
2312 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2313 "higher or equal to max_rpcs_in_flight value (%u)\n",
2314 cli->cl_import->imp_obd->obd_name,
2315 max, cli->cl_max_rpcs_in_flight);
2319 /* cannot exceed max modify RPCs in flight supported by the server */
2320 ocd = &cli->cl_import->imp_connect_data;
2321 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2322 maxmodrpcs = ocd->ocd_maxmodrpcs;
2325 if (max > maxmodrpcs) {
2326 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2327 "higher than max_mod_rpcs_per_client value (%hu) "
2328 "returned by the server at connection\n",
2329 cli->cl_import->imp_obd->obd_name,
2334 spin_lock(&cli->cl_mod_rpcs_lock);
2336 prev = cli->cl_max_mod_rpcs_in_flight;
2337 cli->cl_max_mod_rpcs_in_flight = max;
2339 /* wakeup waiters if limit has been increased */
2340 if (cli->cl_max_mod_rpcs_in_flight > prev)
2341 wake_up(&cli->cl_mod_rpcs_waitq);
2343 spin_unlock(&cli->cl_mod_rpcs_lock);
2347 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2350 #define pct(a, b) (b ? a * 100 / b : 0)
2351 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2352 struct seq_file *seq)
2354 unsigned long mod_tot = 0, mod_cum;
2355 struct timespec64 now;
2358 ktime_get_real_ts64(&now);
2360 spin_lock(&cli->cl_mod_rpcs_lock);
2362 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2363 (s64)now.tv_sec, now.tv_nsec);
2364 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2365 cli->cl_mod_rpcs_in_flight);
2367 seq_printf(seq, "\n\t\t\tmodify\n");
2368 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2370 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2373 for (i = 0; i < OBD_HIST_MAX; i++) {
2374 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2376 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2377 i, mod, pct(mod, mod_tot),
2378 pct(mod_cum, mod_tot));
2379 if (mod_cum == mod_tot)
2383 spin_unlock(&cli->cl_mod_rpcs_lock);
2387 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2391 /* The number of modify RPCs sent in parallel is limited
2392 * because the server has a finite number of slots per client to
2393 * store request result and ensure reply reconstruction when needed.
2394 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2395 * that takes into account server limit and cl_max_rpcs_in_flight
2397 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2398 * one close request is allowed above the maximum.
2400 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2405 /* A slot is available if
2406 * - number of modify RPCs in flight is less than the max
2407 * - it's a close RPC and no other close request is in flight
2409 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2410 (close_req && cli->cl_close_rpcs_in_flight == 0);
2415 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2420 spin_lock(&cli->cl_mod_rpcs_lock);
2421 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2422 spin_unlock(&cli->cl_mod_rpcs_lock);
2426 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2429 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2430 it->it_op == IT_READDIR ||
2431 (it->it_op == IT_LAYOUT && !(it->it_flags & FMODE_WRITE))))
2436 /* Get a modify RPC slot from the obd client @cli according
2437 * to the kind of operation @opc that is going to be sent
2438 * and the intent @it of the operation if it applies.
2439 * If the maximum number of modify RPCs in flight is reached
2440 * the thread is put to sleep.
2441 * Returns the tag to be set in the request message. Tag 0
2442 * is reserved for non-modifying requests.
2444 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2445 struct lookup_intent *it)
2447 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2448 bool close_req = false;
2451 /* read-only metadata RPCs don't consume a slot on MDT
2452 * for reply reconstruction
2454 if (obd_skip_mod_rpc_slot(it))
2457 if (opc == MDS_CLOSE)
2461 spin_lock(&cli->cl_mod_rpcs_lock);
2462 max = cli->cl_max_mod_rpcs_in_flight;
2463 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2464 /* there is a slot available */
2465 cli->cl_mod_rpcs_in_flight++;
2467 cli->cl_close_rpcs_in_flight++;
2468 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2469 cli->cl_mod_rpcs_in_flight);
2470 /* find a free tag */
2471 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2473 LASSERT(i < OBD_MAX_RIF_MAX);
2474 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2475 spin_unlock(&cli->cl_mod_rpcs_lock);
2476 /* tag 0 is reserved for non-modify RPCs */
2479 spin_unlock(&cli->cl_mod_rpcs_lock);
2481 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2482 "opc %u, max %hu\n",
2483 cli->cl_import->imp_obd->obd_name, opc, max);
2485 l_wait_event(cli->cl_mod_rpcs_waitq,
2486 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2489 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2491 /* Put a modify RPC slot from the obd client @cli according
2492 * to the kind of operation @opc that has been sent and the
2493 * intent @it of the operation if it applies.
2495 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2496 struct lookup_intent *it, __u16 tag)
2498 bool close_req = false;
2500 if (obd_skip_mod_rpc_slot(it))
2503 if (opc == MDS_CLOSE)
2506 spin_lock(&cli->cl_mod_rpcs_lock);
2507 cli->cl_mod_rpcs_in_flight--;
2509 cli->cl_close_rpcs_in_flight--;
2510 /* release the tag in the bitmap */
2511 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2512 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2513 spin_unlock(&cli->cl_mod_rpcs_lock);
2514 wake_up(&cli->cl_mod_rpcs_waitq);
2516 EXPORT_SYMBOL(obd_put_mod_rpc_slot);