4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/obdclass/genops.c
33 * These are the only exported functions, they provide some generic
34 * infrastructure for managing object devices
37 #define DEBUG_SUBSYSTEM S_CLASS
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
48 DEFINE_RWLOCK(obd_dev_lock);
49 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51 static struct kmem_cache *obd_device_cachep;
52 static struct kobj_type class_ktype;
53 static struct workqueue_struct *zombie_wq;
55 static void obd_zombie_export_add(struct obd_export *exp);
56 static void obd_zombie_import_add(struct obd_import *imp);
57 static void print_export_data(struct obd_export *exp,
58 const char *status, int locks, int debug_level);
60 static LIST_HEAD(obd_stale_exports);
61 static DEFINE_SPINLOCK(obd_stale_export_lock);
62 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 * support functions: we could use inter-module communication, but this
66 * is more portable to other OS's
68 static struct obd_device *obd_device_alloc(void)
70 struct obd_device *obd;
72 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74 obd->obd_magic = OBD_DEVICE_MAGIC;
79 static void obd_device_free(struct obd_device *obd)
82 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84 if (obd->obd_namespace != NULL) {
85 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86 obd, obd->obd_namespace, obd->obd_force);
89 lu_ref_fini(&obd->obd_reference);
90 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 struct obd_type *class_search_type(const char *name)
95 struct kobject *kobj = kset_find_obj(lustre_kset, name);
97 if (kobj && kobj->ktype == &class_ktype)
98 return container_of(kobj, struct obd_type, typ_kobj);
103 EXPORT_SYMBOL(class_search_type);
105 struct obd_type *class_get_type(const char *name)
107 struct obd_type *type;
109 type = class_search_type(name);
110 #ifdef HAVE_MODULE_LOADING_SUPPORT
112 const char *modname = name;
114 #ifdef HAVE_SERVER_SUPPORT
115 if (strcmp(modname, "obdfilter") == 0)
118 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119 modname = LUSTRE_OSP_NAME;
121 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122 modname = LUSTRE_MDT_NAME;
123 #endif /* HAVE_SERVER_SUPPORT */
125 if (!request_module("%s", modname)) {
126 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127 type = class_search_type(name);
129 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135 if (try_module_get(type->typ_dt_ops->o_owner)) {
136 atomic_inc(&type->typ_refcnt);
137 /* class_search_type() returned a counted reference,
138 * but we don't need that count any more as
139 * we have one through typ_refcnt.
141 kobject_put(&type->typ_kobj);
143 kobject_put(&type->typ_kobj);
149 EXPORT_SYMBOL(class_get_type);
151 void class_put_type(struct obd_type *type)
154 module_put(type->typ_dt_ops->o_owner);
155 atomic_dec(&type->typ_refcnt);
157 EXPORT_SYMBOL(class_put_type);
159 static void class_sysfs_release(struct kobject *kobj)
161 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
163 debugfs_remove_recursive(type->typ_debugfs_entry);
164 type->typ_debugfs_entry = NULL;
167 lu_device_type_fini(type->typ_lu);
169 #ifdef CONFIG_PROC_FS
170 if (type->typ_name && type->typ_procroot)
171 remove_proc_subtree(type->typ_name, proc_lustre_root);
173 OBD_FREE(type, sizeof(*type));
176 static struct kobj_type class_ktype = {
177 .sysfs_ops = &lustre_sysfs_ops,
178 .release = class_sysfs_release,
181 #ifdef HAVE_SERVER_SUPPORT
182 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
184 struct dentry *symlink;
185 struct obd_type *type;
188 type = class_search_type(name);
190 kobject_put(&type->typ_kobj);
191 return ERR_PTR(-EEXIST);
194 OBD_ALLOC(type, sizeof(*type));
196 return ERR_PTR(-ENOMEM);
198 type->typ_kobj.kset = lustre_kset;
199 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
200 &lustre_kset->kobj, "%s", name);
204 symlink = debugfs_create_dir(name, debugfs_lustre_root);
205 type->typ_debugfs_entry = symlink;
206 type->typ_sym_filter = true;
209 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
211 if (IS_ERR(type->typ_procroot)) {
212 CERROR("%s: can't create compat proc entry: %d\n",
213 name, (int)PTR_ERR(type->typ_procroot));
214 type->typ_procroot = NULL;
220 EXPORT_SYMBOL(class_add_symlinks);
221 #endif /* HAVE_SERVER_SUPPORT */
223 #define CLASS_MAX_NAME 1024
225 int class_register_type(const struct obd_ops *dt_ops,
226 const struct md_ops *md_ops,
228 const char *name, struct lu_device_type *ldt)
230 struct obd_type *type;
235 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
237 type = class_search_type(name);
239 #ifdef HAVE_SERVER_SUPPORT
240 if (type->typ_sym_filter)
242 #endif /* HAVE_SERVER_SUPPORT */
243 kobject_put(&type->typ_kobj);
244 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
248 OBD_ALLOC(type, sizeof(*type));
252 type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
253 type->typ_kobj.kset = lustre_kset;
254 kobject_init(&type->typ_kobj, &class_ktype);
255 #ifdef HAVE_SERVER_SUPPORT
257 #endif /* HAVE_SERVER_SUPPORT */
259 type->typ_dt_ops = dt_ops;
260 type->typ_md_ops = md_ops;
262 #ifdef HAVE_SERVER_SUPPORT
263 if (type->typ_sym_filter) {
264 type->typ_sym_filter = false;
265 kobject_put(&type->typ_kobj);
269 #ifdef CONFIG_PROC_FS
270 if (enable_proc && !type->typ_procroot) {
271 type->typ_procroot = lprocfs_register(name,
274 if (IS_ERR(type->typ_procroot)) {
275 rc = PTR_ERR(type->typ_procroot);
276 type->typ_procroot = NULL;
281 type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
283 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
286 #ifdef HAVE_SERVER_SUPPORT
290 rc = lu_device_type_init(ldt);
291 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
292 wake_up_var(&type->typ_lu);
300 kobject_put(&type->typ_kobj);
304 EXPORT_SYMBOL(class_register_type);
306 int class_unregister_type(const char *name)
308 struct obd_type *type = class_search_type(name);
313 CERROR("unknown obd type\n");
317 if (atomic_read(&type->typ_refcnt)) {
318 CERROR("type %s has refcount (%d)\n", name,
319 atomic_read(&type->typ_refcnt));
320 /* This is a bad situation, let's make the best of it */
321 /* Remove ops, but leave the name for debugging */
322 type->typ_dt_ops = NULL;
323 type->typ_md_ops = NULL;
324 GOTO(out_put, rc = -EBUSY);
327 /* Put the final ref */
328 kobject_put(&type->typ_kobj);
330 /* Put the ref returned by class_search_type() */
331 kobject_put(&type->typ_kobj);
334 } /* class_unregister_type */
335 EXPORT_SYMBOL(class_unregister_type);
338 * Create a new obd device.
340 * Allocate the new obd_device and initialize it.
342 * \param[in] type_name obd device type string.
343 * \param[in] name obd device name.
344 * \param[in] uuid obd device UUID
346 * \retval newdev pointer to created obd_device
347 * \retval ERR_PTR(errno) on error
349 struct obd_device *class_newdev(const char *type_name, const char *name,
352 struct obd_device *newdev;
353 struct obd_type *type = NULL;
356 if (strlen(name) >= MAX_OBD_NAME) {
357 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
358 RETURN(ERR_PTR(-EINVAL));
361 type = class_get_type(type_name);
363 CERROR("OBD: unknown type: %s\n", type_name);
364 RETURN(ERR_PTR(-ENODEV));
367 newdev = obd_device_alloc();
368 if (newdev == NULL) {
369 class_put_type(type);
370 RETURN(ERR_PTR(-ENOMEM));
372 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
373 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
374 newdev->obd_type = type;
375 newdev->obd_minor = -1;
377 rwlock_init(&newdev->obd_pool_lock);
378 newdev->obd_pool_limit = 0;
379 newdev->obd_pool_slv = 0;
381 INIT_LIST_HEAD(&newdev->obd_exports);
382 newdev->obd_num_exports = 0;
383 newdev->obd_grant_check_threshold = 100;
384 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
385 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
386 INIT_LIST_HEAD(&newdev->obd_exports_timed);
387 INIT_LIST_HEAD(&newdev->obd_nid_stats);
388 spin_lock_init(&newdev->obd_nid_lock);
389 spin_lock_init(&newdev->obd_dev_lock);
390 mutex_init(&newdev->obd_dev_mutex);
391 spin_lock_init(&newdev->obd_osfs_lock);
392 /* newdev->obd_osfs_age must be set to a value in the distant
393 * past to guarantee a fresh statfs is fetched on mount. */
394 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
396 /* XXX belongs in setup not attach */
397 init_rwsem(&newdev->obd_observer_link_sem);
399 spin_lock_init(&newdev->obd_recovery_task_lock);
400 init_waitqueue_head(&newdev->obd_next_transno_waitq);
401 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
402 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
403 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
404 INIT_LIST_HEAD(&newdev->obd_evict_list);
405 INIT_LIST_HEAD(&newdev->obd_lwp_list);
407 llog_group_init(&newdev->obd_olg);
408 /* Detach drops this */
409 atomic_set(&newdev->obd_refcount, 1);
410 lu_ref_init(&newdev->obd_reference);
411 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
413 newdev->obd_conn_inprogress = 0;
415 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
417 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
418 newdev->obd_name, newdev);
426 * \param[in] obd obd_device to be freed
430 void class_free_dev(struct obd_device *obd)
432 struct obd_type *obd_type = obd->obd_type;
434 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
435 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
436 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
437 "obd %p != obd_devs[%d] %p\n",
438 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
439 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
440 "obd_refcount should be 0, not %d\n",
441 atomic_read(&obd->obd_refcount));
442 LASSERT(obd_type != NULL);
444 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
445 obd->obd_name, obd->obd_type->typ_name);
447 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
448 obd->obd_name, obd->obd_uuid.uuid);
449 if (obd->obd_stopping) {
452 /* If we're not stopping, we were never set up */
453 err = obd_cleanup(obd);
455 CERROR("Cleanup %s returned %d\n",
459 obd_device_free(obd);
461 class_put_type(obd_type);
465 * Unregister obd device.
467 * Free slot in obd_dev[] used by \a obd.
469 * \param[in] new_obd obd_device to be unregistered
473 void class_unregister_device(struct obd_device *obd)
475 write_lock(&obd_dev_lock);
476 if (obd->obd_minor >= 0) {
477 LASSERT(obd_devs[obd->obd_minor] == obd);
478 obd_devs[obd->obd_minor] = NULL;
481 write_unlock(&obd_dev_lock);
485 * Register obd device.
487 * Find free slot in obd_devs[], fills it with \a new_obd.
489 * \param[in] new_obd obd_device to be registered
492 * \retval -EEXIST device with this name is registered
493 * \retval -EOVERFLOW obd_devs[] is full
495 int class_register_device(struct obd_device *new_obd)
499 int new_obd_minor = 0;
500 bool minor_assign = false;
501 bool retried = false;
504 write_lock(&obd_dev_lock);
505 for (i = 0; i < class_devno_max(); i++) {
506 struct obd_device *obd = class_num2obd(i);
509 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
512 write_unlock(&obd_dev_lock);
514 /* the obd_device could be waited to be
515 * destroyed by the "obd_zombie_impexp_thread".
517 obd_zombie_barrier();
522 CERROR("%s: already exists, won't add\n",
524 /* in case we found a free slot before duplicate */
525 minor_assign = false;
529 if (!minor_assign && obd == NULL) {
536 new_obd->obd_minor = new_obd_minor;
537 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
538 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
539 obd_devs[new_obd_minor] = new_obd;
543 CERROR("%s: all %u/%u devices used, increase "
544 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
545 i, class_devno_max(), ret);
548 write_unlock(&obd_dev_lock);
553 static int class_name2dev_nolock(const char *name)
560 for (i = 0; i < class_devno_max(); i++) {
561 struct obd_device *obd = class_num2obd(i);
563 if (obd && strcmp(name, obd->obd_name) == 0) {
564 /* Make sure we finished attaching before we give
565 out any references */
566 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
567 if (obd->obd_attached) {
577 int class_name2dev(const char *name)
584 read_lock(&obd_dev_lock);
585 i = class_name2dev_nolock(name);
586 read_unlock(&obd_dev_lock);
590 EXPORT_SYMBOL(class_name2dev);
592 struct obd_device *class_name2obd(const char *name)
594 int dev = class_name2dev(name);
596 if (dev < 0 || dev > class_devno_max())
598 return class_num2obd(dev);
600 EXPORT_SYMBOL(class_name2obd);
602 static int class_uuid2dev_nolock(struct obd_uuid *uuid)
606 for (i = 0; i < class_devno_max(); i++) {
607 struct obd_device *obd = class_num2obd(i);
609 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
610 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
618 int class_uuid2dev(struct obd_uuid *uuid)
622 read_lock(&obd_dev_lock);
623 i = class_uuid2dev_nolock(uuid);
624 read_unlock(&obd_dev_lock);
628 EXPORT_SYMBOL(class_uuid2dev);
630 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
632 int dev = class_uuid2dev(uuid);
635 return class_num2obd(dev);
637 EXPORT_SYMBOL(class_uuid2obd);
640 * Get obd device from ::obd_devs[]
642 * \param num [in] array index
644 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
645 * otherwise return the obd device there.
647 struct obd_device *class_num2obd(int num)
649 struct obd_device *obd = NULL;
651 if (num < class_devno_max()) {
656 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
657 "%p obd_magic %08x != %08x\n",
658 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
659 LASSERTF(obd->obd_minor == num,
660 "%p obd_minor %0d != %0d\n",
661 obd, obd->obd_minor, num);
666 EXPORT_SYMBOL(class_num2obd);
669 * Find obd in obd_dev[] by name or uuid.
671 * Increment obd's refcount if found.
673 * \param[in] str obd name or uuid
675 * \retval NULL if not found
676 * \retval target pointer to found obd_device
678 struct obd_device *class_dev_by_str(const char *str)
680 struct obd_device *target = NULL;
681 struct obd_uuid tgtuuid;
684 obd_str2uuid(&tgtuuid, str);
686 read_lock(&obd_dev_lock);
687 rc = class_uuid2dev_nolock(&tgtuuid);
689 rc = class_name2dev_nolock(str);
692 target = class_num2obd(rc);
695 class_incref(target, "find", current);
696 read_unlock(&obd_dev_lock);
700 EXPORT_SYMBOL(class_dev_by_str);
703 * Get obd devices count. Device in any
705 * \retval obd device count
707 int get_devices_count(void)
709 int index, max_index = class_devno_max(), dev_count = 0;
711 read_lock(&obd_dev_lock);
712 for (index = 0; index <= max_index; index++) {
713 struct obd_device *obd = class_num2obd(index);
717 read_unlock(&obd_dev_lock);
721 EXPORT_SYMBOL(get_devices_count);
723 void class_obd_list(void)
728 read_lock(&obd_dev_lock);
729 for (i = 0; i < class_devno_max(); i++) {
730 struct obd_device *obd = class_num2obd(i);
734 if (obd->obd_stopping)
736 else if (obd->obd_set_up)
738 else if (obd->obd_attached)
742 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
743 i, status, obd->obd_type->typ_name,
744 obd->obd_name, obd->obd_uuid.uuid,
745 atomic_read(&obd->obd_refcount));
747 read_unlock(&obd_dev_lock);
750 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
751 * specified, then only the client with that uuid is returned,
752 * otherwise any client connected to the tgt is returned.
754 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
755 const char *type_name,
756 struct obd_uuid *grp_uuid)
760 read_lock(&obd_dev_lock);
761 for (i = 0; i < class_devno_max(); i++) {
762 struct obd_device *obd = class_num2obd(i);
766 if ((strncmp(obd->obd_type->typ_name, type_name,
767 strlen(type_name)) == 0)) {
768 if (obd_uuid_equals(tgt_uuid,
769 &obd->u.cli.cl_target_uuid) &&
770 ((grp_uuid)? obd_uuid_equals(grp_uuid,
771 &obd->obd_uuid) : 1)) {
772 read_unlock(&obd_dev_lock);
777 read_unlock(&obd_dev_lock);
781 EXPORT_SYMBOL(class_find_client_obd);
783 /* Iterate the obd_device list looking devices have grp_uuid. Start
784 * searching at *next, and if a device is found, the next index to look
785 * at is saved in *next. If next is NULL, then the first matching device
786 * will always be returned.
788 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
794 else if (*next >= 0 && *next < class_devno_max())
799 read_lock(&obd_dev_lock);
800 for (; i < class_devno_max(); i++) {
801 struct obd_device *obd = class_num2obd(i);
805 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
808 read_unlock(&obd_dev_lock);
812 read_unlock(&obd_dev_lock);
816 EXPORT_SYMBOL(class_devices_in_group);
819 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
820 * adjust sptlrpc settings accordingly.
822 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
824 struct obd_device *obd;
828 LASSERT(namelen > 0);
830 read_lock(&obd_dev_lock);
831 for (i = 0; i < class_devno_max(); i++) {
832 obd = class_num2obd(i);
834 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
837 /* only notify mdc, osc, osp, lwp, mdt, ost
838 * because only these have a -sptlrpc llog */
839 type = obd->obd_type->typ_name;
840 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
841 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
842 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
843 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
844 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
845 strcmp(type, LUSTRE_OST_NAME) != 0)
848 if (strncmp(obd->obd_name, fsname, namelen))
851 class_incref(obd, __FUNCTION__, obd);
852 read_unlock(&obd_dev_lock);
853 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
854 sizeof(KEY_SPTLRPC_CONF),
855 KEY_SPTLRPC_CONF, 0, NULL, NULL);
857 class_decref(obd, __FUNCTION__, obd);
858 read_lock(&obd_dev_lock);
860 read_unlock(&obd_dev_lock);
863 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
865 void obd_cleanup_caches(void)
868 if (obd_device_cachep) {
869 kmem_cache_destroy(obd_device_cachep);
870 obd_device_cachep = NULL;
876 int obd_init_caches(void)
881 LASSERT(obd_device_cachep == NULL);
882 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
883 sizeof(struct obd_device),
884 0, 0, 0, sizeof(struct obd_device), NULL);
885 if (!obd_device_cachep)
886 GOTO(out, rc = -ENOMEM);
890 obd_cleanup_caches();
894 static const char export_handle_owner[] = "export";
896 /* map connection to client */
897 struct obd_export *class_conn2export(struct lustre_handle *conn)
899 struct obd_export *export;
903 CDEBUG(D_CACHE, "looking for null handle\n");
907 if (conn->cookie == -1) { /* this means assign a new connection */
908 CDEBUG(D_CACHE, "want a new connection\n");
912 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
913 export = class_handle2object(conn->cookie, export_handle_owner);
916 EXPORT_SYMBOL(class_conn2export);
918 struct obd_device *class_exp2obd(struct obd_export *exp)
924 EXPORT_SYMBOL(class_exp2obd);
926 struct obd_import *class_exp2cliimp(struct obd_export *exp)
928 struct obd_device *obd = exp->exp_obd;
931 return obd->u.cli.cl_import;
933 EXPORT_SYMBOL(class_exp2cliimp);
935 /* Export management functions */
936 static void class_export_destroy(struct obd_export *exp)
938 struct obd_device *obd = exp->exp_obd;
941 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
942 LASSERT(obd != NULL);
944 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
945 exp->exp_client_uuid.uuid, obd->obd_name);
947 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
948 ptlrpc_connection_put(exp->exp_connection);
950 LASSERT(list_empty(&exp->exp_outstanding_replies));
951 LASSERT(list_empty(&exp->exp_uncommitted_replies));
952 LASSERT(list_empty(&exp->exp_req_replay_queue));
953 LASSERT(list_empty(&exp->exp_hp_rpcs));
954 obd_destroy_export(exp);
955 /* self export doesn't hold a reference to an obd, although it
956 * exists until freeing of the obd */
957 if (exp != obd->obd_self_export)
958 class_decref(obd, "export", exp);
960 OBD_FREE_PRE(exp, sizeof(*exp), "kfree_rcu");
961 kfree_rcu(exp, exp_handle.h_rcu);
965 struct obd_export *class_export_get(struct obd_export *exp)
967 refcount_inc(&exp->exp_handle.h_ref);
968 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
969 refcount_read(&exp->exp_handle.h_ref));
972 EXPORT_SYMBOL(class_export_get);
974 void class_export_put(struct obd_export *exp)
976 LASSERT(exp != NULL);
977 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
978 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
979 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
980 refcount_read(&exp->exp_handle.h_ref) - 1);
982 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
983 struct obd_device *obd = exp->exp_obd;
985 CDEBUG(D_IOCTL, "final put %p/%s\n",
986 exp, exp->exp_client_uuid.uuid);
988 /* release nid stat refererence */
989 lprocfs_exp_cleanup(exp);
991 if (exp == obd->obd_self_export) {
992 /* self export should be destroyed without
993 * zombie thread as it doesn't hold a
994 * reference to obd and doesn't hold any
996 class_export_destroy(exp);
997 /* self export is destroyed, no class
998 * references exist and it is safe to free
1000 class_free_dev(obd);
1002 LASSERT(!list_empty(&exp->exp_obd_chain));
1003 obd_zombie_export_add(exp);
1008 EXPORT_SYMBOL(class_export_put);
1010 static void obd_zombie_exp_cull(struct work_struct *ws)
1012 struct obd_export *export;
1014 export = container_of(ws, struct obd_export, exp_zombie_work);
1015 class_export_destroy(export);
1016 LASSERT(atomic_read(&obd_stale_export_num) > 0);
1017 if (atomic_dec_and_test(&obd_stale_export_num))
1018 wake_up_var(&obd_stale_export_num);
1021 /* Creates a new export, adds it to the hash table, and returns a
1022 * pointer to it. The refcount is 2: one for the hash reference, and
1023 * one for the pointer returned by this function. */
1024 static struct obd_export *__class_new_export(struct obd_device *obd,
1025 struct obd_uuid *cluuid,
1028 struct obd_export *export;
1032 OBD_ALLOC_PTR(export);
1034 return ERR_PTR(-ENOMEM);
1036 export->exp_conn_cnt = 0;
1037 export->exp_lock_hash = NULL;
1038 export->exp_flock_hash = NULL;
1039 /* 2 = class_handle_hash + last */
1040 refcount_set(&export->exp_handle.h_ref, 2);
1041 atomic_set(&export->exp_rpc_count, 0);
1042 atomic_set(&export->exp_cb_count, 0);
1043 atomic_set(&export->exp_locks_count, 0);
1044 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1045 INIT_LIST_HEAD(&export->exp_locks_list);
1046 spin_lock_init(&export->exp_locks_list_guard);
1048 atomic_set(&export->exp_replay_count, 0);
1049 export->exp_obd = obd;
1050 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1051 spin_lock_init(&export->exp_uncommitted_replies_lock);
1052 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1053 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1054 INIT_HLIST_NODE(&export->exp_handle.h_link);
1055 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1056 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1057 class_handle_hash(&export->exp_handle, export_handle_owner);
1058 export->exp_last_request_time = ktime_get_real_seconds();
1059 spin_lock_init(&export->exp_lock);
1060 spin_lock_init(&export->exp_rpc_lock);
1061 INIT_HLIST_NODE(&export->exp_gen_hash);
1062 spin_lock_init(&export->exp_bl_list_lock);
1063 INIT_LIST_HEAD(&export->exp_bl_list);
1064 INIT_LIST_HEAD(&export->exp_stale_list);
1065 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1067 export->exp_sp_peer = LUSTRE_SP_ANY;
1068 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1069 export->exp_client_uuid = *cluuid;
1070 obd_init_export(export);
1072 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1073 export->exp_root_fid.f_seq = 0;
1074 export->exp_root_fid.f_oid = 0;
1075 export->exp_root_fid.f_ver = 0;
1077 spin_lock(&obd->obd_dev_lock);
1078 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1079 /* shouldn't happen, but might race */
1080 if (obd->obd_stopping)
1081 GOTO(exit_unlock, rc = -ENODEV);
1083 rc = obd_uuid_add(obd, export);
1085 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1086 obd->obd_name, cluuid->uuid, rc);
1087 GOTO(exit_unlock, rc = -EALREADY);
1092 class_incref(obd, "export", export);
1093 list_add_tail(&export->exp_obd_chain_timed,
1094 &obd->obd_exports_timed);
1095 list_add(&export->exp_obd_chain, &obd->obd_exports);
1096 obd->obd_num_exports++;
1098 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1099 INIT_LIST_HEAD(&export->exp_obd_chain);
1101 spin_unlock(&obd->obd_dev_lock);
1105 spin_unlock(&obd->obd_dev_lock);
1106 class_handle_unhash(&export->exp_handle);
1107 obd_destroy_export(export);
1108 OBD_FREE_PTR(export);
1112 struct obd_export *class_new_export(struct obd_device *obd,
1113 struct obd_uuid *uuid)
1115 return __class_new_export(obd, uuid, false);
1117 EXPORT_SYMBOL(class_new_export);
1119 struct obd_export *class_new_export_self(struct obd_device *obd,
1120 struct obd_uuid *uuid)
1122 return __class_new_export(obd, uuid, true);
1125 void class_unlink_export(struct obd_export *exp)
1127 class_handle_unhash(&exp->exp_handle);
1129 if (exp->exp_obd->obd_self_export == exp) {
1130 class_export_put(exp);
1134 spin_lock(&exp->exp_obd->obd_dev_lock);
1135 /* delete an uuid-export hashitem from hashtables */
1136 if (exp != exp->exp_obd->obd_self_export)
1137 obd_uuid_del(exp->exp_obd, exp);
1139 #ifdef HAVE_SERVER_SUPPORT
1140 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1141 struct tg_export_data *ted = &exp->exp_target_data;
1142 struct cfs_hash *hash;
1144 /* Because obd_gen_hash will not be released until
1145 * class_cleanup(), so hash should never be NULL here */
1146 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1147 LASSERT(hash != NULL);
1148 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1149 &exp->exp_gen_hash);
1150 cfs_hash_putref(hash);
1152 #endif /* HAVE_SERVER_SUPPORT */
1154 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1155 list_del_init(&exp->exp_obd_chain_timed);
1156 exp->exp_obd->obd_num_exports--;
1157 spin_unlock(&exp->exp_obd->obd_dev_lock);
1159 /* A reference is kept by obd_stale_exports list */
1160 obd_stale_export_put(exp);
1162 EXPORT_SYMBOL(class_unlink_export);
1164 /* Import management functions */
1165 static void obd_zombie_import_free(struct obd_import *imp)
1167 struct obd_import_conn *imp_conn;
1170 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1171 imp->imp_obd->obd_name);
1173 LASSERT(refcount_read(&imp->imp_refcount) == 0);
1175 ptlrpc_connection_put(imp->imp_connection);
1177 while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1178 struct obd_import_conn,
1179 oic_item)) != NULL) {
1180 list_del_init(&imp_conn->oic_item);
1181 ptlrpc_connection_put(imp_conn->oic_conn);
1182 OBD_FREE(imp_conn, sizeof(*imp_conn));
1185 LASSERT(imp->imp_sec == NULL);
1186 LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1187 imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1188 class_decref(imp->imp_obd, "import", imp);
1193 struct obd_import *class_import_get(struct obd_import *import)
1195 refcount_inc(&import->imp_refcount);
1196 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1197 refcount_read(&import->imp_refcount),
1198 import->imp_obd->obd_name);
1201 EXPORT_SYMBOL(class_import_get);
1203 void class_import_put(struct obd_import *imp)
1207 LASSERT(refcount_read(&imp->imp_refcount) > 0);
1209 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1210 refcount_read(&imp->imp_refcount) - 1,
1211 imp->imp_obd->obd_name);
1213 if (refcount_dec_and_test(&imp->imp_refcount)) {
1214 CDEBUG(D_INFO, "final put import %p\n", imp);
1215 obd_zombie_import_add(imp);
1220 EXPORT_SYMBOL(class_import_put);
1222 static void init_imp_at(struct imp_at *at) {
1224 at_init(&at->iat_net_latency, 0, 0);
1225 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1226 /* max service estimates are tracked on the server side, so
1227 don't use the AT history here, just use the last reported
1228 val. (But keep hist for proc histogram, worst_ever) */
1229 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1234 static void obd_zombie_imp_cull(struct work_struct *ws)
1236 struct obd_import *import;
1238 import = container_of(ws, struct obd_import, imp_zombie_work);
1239 obd_zombie_import_free(import);
1242 struct obd_import *class_new_import(struct obd_device *obd)
1244 struct obd_import *imp;
1245 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1247 OBD_ALLOC(imp, sizeof(*imp));
1251 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1252 INIT_LIST_HEAD(&imp->imp_replay_list);
1253 INIT_LIST_HEAD(&imp->imp_sending_list);
1254 INIT_LIST_HEAD(&imp->imp_delayed_list);
1255 INIT_LIST_HEAD(&imp->imp_committed_list);
1256 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1257 imp->imp_known_replied_xid = 0;
1258 imp->imp_replay_cursor = &imp->imp_committed_list;
1259 spin_lock_init(&imp->imp_lock);
1260 imp->imp_last_success_conn = 0;
1261 imp->imp_state = LUSTRE_IMP_NEW;
1262 imp->imp_obd = class_incref(obd, "import", imp);
1263 rwlock_init(&imp->imp_sec_lock);
1264 init_waitqueue_head(&imp->imp_recovery_waitq);
1265 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1267 if (curr_pid_ns && curr_pid_ns->child_reaper)
1268 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1270 imp->imp_sec_refpid = 1;
1272 refcount_set(&imp->imp_refcount, 2);
1273 atomic_set(&imp->imp_unregistering, 0);
1274 atomic_set(&imp->imp_reqs, 0);
1275 atomic_set(&imp->imp_inflight, 0);
1276 atomic_set(&imp->imp_replay_inflight, 0);
1277 init_waitqueue_head(&imp->imp_replay_waitq);
1278 atomic_set(&imp->imp_inval_count, 0);
1279 atomic_set(&imp->imp_waiting, 0);
1280 INIT_LIST_HEAD(&imp->imp_conn_list);
1281 init_imp_at(&imp->imp_at);
1283 /* the default magic is V2, will be used in connect RPC, and
1284 * then adjusted according to the flags in request/reply. */
1285 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1289 EXPORT_SYMBOL(class_new_import);
1291 void class_destroy_import(struct obd_import *import)
1293 LASSERT(import != NULL);
1294 LASSERT(import != LP_POISON);
1296 spin_lock(&import->imp_lock);
1297 import->imp_generation++;
1298 spin_unlock(&import->imp_lock);
1299 class_import_put(import);
1301 EXPORT_SYMBOL(class_destroy_import);
1303 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1305 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1307 spin_lock(&exp->exp_locks_list_guard);
1309 LASSERT(lock->l_exp_refs_nr >= 0);
1311 if (lock->l_exp_refs_target != NULL &&
1312 lock->l_exp_refs_target != exp) {
1313 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1314 exp, lock, lock->l_exp_refs_target);
1316 if ((lock->l_exp_refs_nr ++) == 0) {
1317 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1318 lock->l_exp_refs_target = exp;
1320 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1321 lock, exp, lock->l_exp_refs_nr);
1322 spin_unlock(&exp->exp_locks_list_guard);
1324 EXPORT_SYMBOL(__class_export_add_lock_ref);
1326 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1328 spin_lock(&exp->exp_locks_list_guard);
1329 LASSERT(lock->l_exp_refs_nr > 0);
1330 if (lock->l_exp_refs_target != exp) {
1331 LCONSOLE_WARN("lock %p, "
1332 "mismatching export pointers: %p, %p\n",
1333 lock, lock->l_exp_refs_target, exp);
1335 if (-- lock->l_exp_refs_nr == 0) {
1336 list_del_init(&lock->l_exp_refs_link);
1337 lock->l_exp_refs_target = NULL;
1339 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1340 lock, exp, lock->l_exp_refs_nr);
1341 spin_unlock(&exp->exp_locks_list_guard);
1343 EXPORT_SYMBOL(__class_export_del_lock_ref);
1346 /* A connection defines an export context in which preallocation can
1347 be managed. This releases the export pointer reference, and returns
1348 the export handle, so the export refcount is 1 when this function
1350 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1351 struct obd_uuid *cluuid)
1353 struct obd_export *export;
1354 LASSERT(conn != NULL);
1355 LASSERT(obd != NULL);
1356 LASSERT(cluuid != NULL);
1359 export = class_new_export(obd, cluuid);
1361 RETURN(PTR_ERR(export));
1363 conn->cookie = export->exp_handle.h_cookie;
1364 class_export_put(export);
1366 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1367 cluuid->uuid, conn->cookie);
1370 EXPORT_SYMBOL(class_connect);
1372 /* if export is involved in recovery then clean up related things */
1373 static void class_export_recovery_cleanup(struct obd_export *exp)
1375 struct obd_device *obd = exp->exp_obd;
1377 spin_lock(&obd->obd_recovery_task_lock);
1378 if (obd->obd_recovering) {
1379 if (exp->exp_in_recovery) {
1380 spin_lock(&exp->exp_lock);
1381 exp->exp_in_recovery = 0;
1382 spin_unlock(&exp->exp_lock);
1383 LASSERT(atomic_read(&(obd)->obd_connected_clients) > 0);
1384 atomic_dec(&obd->obd_connected_clients);
1387 /* if called during recovery then should update
1388 * obd_stale_clients counter,
1389 * lightweight exports are not counted */
1390 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1391 exp->exp_obd->obd_stale_clients++;
1393 spin_unlock(&obd->obd_recovery_task_lock);
1395 spin_lock(&exp->exp_lock);
1396 /** Cleanup req replay fields */
1397 if (exp->exp_req_replay_needed) {
1398 exp->exp_req_replay_needed = 0;
1400 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1401 atomic_dec(&obd->obd_req_replay_clients);
1404 /** Cleanup lock replay data */
1405 if (exp->exp_lock_replay_needed) {
1406 exp->exp_lock_replay_needed = 0;
1408 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1409 atomic_dec(&obd->obd_lock_replay_clients);
1411 spin_unlock(&exp->exp_lock);
1414 /* This function removes 1-3 references from the export:
1415 * 1 - for export pointer passed
1416 * and if disconnect really need
1417 * 2 - removing from hash
1418 * 3 - in client_unlink_export
1419 * The export pointer passed to this function can destroyed */
1420 int class_disconnect(struct obd_export *export)
1422 int already_disconnected;
1425 if (export == NULL) {
1426 CWARN("attempting to free NULL export %p\n", export);
1430 spin_lock(&export->exp_lock);
1431 already_disconnected = export->exp_disconnected;
1432 export->exp_disconnected = 1;
1433 #ifdef HAVE_SERVER_SUPPORT
1434 /* We hold references of export for uuid hash
1435 * and nid_hash and export link at least. So
1436 * it is safe to call rh*table_remove_fast in
1439 obd_nid_del(export->exp_obd, export);
1440 #endif /* HAVE_SERVER_SUPPORT */
1441 spin_unlock(&export->exp_lock);
1443 /* class_cleanup(), abort_recovery(), and class_fail_export()
1444 * all end up in here, and if any of them race we shouldn't
1445 * call extra class_export_puts(). */
1446 if (already_disconnected)
1447 GOTO(no_disconn, already_disconnected);
1449 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1450 export->exp_handle.h_cookie);
1452 class_export_recovery_cleanup(export);
1453 class_unlink_export(export);
1455 class_export_put(export);
1458 EXPORT_SYMBOL(class_disconnect);
1460 /* Return non-zero for a fully connected export */
1461 int class_connected_export(struct obd_export *exp)
1466 spin_lock(&exp->exp_lock);
1467 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1468 spin_unlock(&exp->exp_lock);
1472 EXPORT_SYMBOL(class_connected_export);
1474 static void class_disconnect_export_list(struct list_head *list,
1475 enum obd_option flags)
1478 struct obd_export *exp;
1481 /* It's possible that an export may disconnect itself, but
1482 * nothing else will be added to this list.
1484 while ((exp = list_first_entry_or_null(list, struct obd_export,
1485 exp_obd_chain)) != NULL) {
1486 /* need for safe call CDEBUG after obd_disconnect */
1487 class_export_get(exp);
1489 spin_lock(&exp->exp_lock);
1490 exp->exp_flags = flags;
1491 spin_unlock(&exp->exp_lock);
1493 if (obd_uuid_equals(&exp->exp_client_uuid,
1494 &exp->exp_obd->obd_uuid)) {
1496 "exp %p export uuid == obd uuid, don't discon\n",
1498 /* Need to delete this now so we don't end up pointing
1499 * to work_list later when this export is cleaned up. */
1500 list_del_init(&exp->exp_obd_chain);
1501 class_export_put(exp);
1505 class_export_get(exp);
1506 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1507 "last request at %lld\n",
1508 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1509 exp, exp->exp_last_request_time);
1510 /* release one export reference anyway */
1511 rc = obd_disconnect(exp);
1513 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1514 obd_export_nid2str(exp), exp, rc);
1515 class_export_put(exp);
1520 void class_disconnect_exports(struct obd_device *obd)
1522 LIST_HEAD(work_list);
1525 /* Move all of the exports from obd_exports to a work list, en masse. */
1526 spin_lock(&obd->obd_dev_lock);
1527 list_splice_init(&obd->obd_exports, &work_list);
1528 list_splice_init(&obd->obd_delayed_exports, &work_list);
1529 spin_unlock(&obd->obd_dev_lock);
1531 if (!list_empty(&work_list)) {
1532 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1533 "disconnecting them\n", obd->obd_minor, obd);
1534 class_disconnect_export_list(&work_list,
1535 exp_flags_from_obd(obd));
1537 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1538 obd->obd_minor, obd);
1541 EXPORT_SYMBOL(class_disconnect_exports);
1543 /* Remove exports that have not completed recovery.
1545 void class_disconnect_stale_exports(struct obd_device *obd,
1546 int (*test_export)(struct obd_export *))
1548 LIST_HEAD(work_list);
1549 struct obd_export *exp, *n;
1553 spin_lock(&obd->obd_dev_lock);
1554 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1556 /* don't count self-export as client */
1557 if (obd_uuid_equals(&exp->exp_client_uuid,
1558 &exp->exp_obd->obd_uuid))
1561 /* don't evict clients which have no slot in last_rcvd
1562 * (e.g. lightweight connection) */
1563 if (exp->exp_target_data.ted_lr_idx == -1)
1566 spin_lock(&exp->exp_lock);
1567 if (exp->exp_failed || test_export(exp)) {
1568 spin_unlock(&exp->exp_lock);
1571 exp->exp_failed = 1;
1572 atomic_inc(&exp->exp_obd->obd_eviction_count);
1573 spin_unlock(&exp->exp_lock);
1575 list_move(&exp->exp_obd_chain, &work_list);
1577 CWARN("%s: disconnect stale client %s@%s\n",
1578 obd->obd_name, exp->exp_client_uuid.uuid,
1579 obd_export_nid2str(exp));
1580 print_export_data(exp, "EVICTING", 0, D_HA);
1582 spin_unlock(&obd->obd_dev_lock);
1585 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1586 obd->obd_name, evicted);
1588 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1589 OBD_OPT_ABORT_RECOV);
1592 EXPORT_SYMBOL(class_disconnect_stale_exports);
1594 void class_fail_export(struct obd_export *exp)
1596 int rc, already_failed;
1598 spin_lock(&exp->exp_lock);
1599 already_failed = exp->exp_failed;
1600 exp->exp_failed = 1;
1601 spin_unlock(&exp->exp_lock);
1603 if (already_failed) {
1604 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1605 exp, exp->exp_client_uuid.uuid);
1609 atomic_inc(&exp->exp_obd->obd_eviction_count);
1611 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1612 exp, exp->exp_client_uuid.uuid);
1614 if (obd_dump_on_timeout)
1615 libcfs_debug_dumplog();
1617 /* need for safe call CDEBUG after obd_disconnect */
1618 class_export_get(exp);
1620 /* Most callers into obd_disconnect are removing their own reference
1621 * (request, for example) in addition to the one from the hash table.
1622 * We don't have such a reference here, so make one. */
1623 class_export_get(exp);
1624 rc = obd_disconnect(exp);
1626 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1628 CDEBUG(D_HA, "disconnected export %p/%s\n",
1629 exp, exp->exp_client_uuid.uuid);
1630 class_export_put(exp);
1632 EXPORT_SYMBOL(class_fail_export);
1634 #ifdef HAVE_SERVER_SUPPORT
1636 static int take_first(struct obd_export *exp, void *data)
1638 struct obd_export **expp = data;
1641 /* already have one */
1643 if (exp->exp_failed)
1644 /* Don't want this one */
1646 if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1647 /* Cannot get a ref on this one */
1653 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1655 struct lnet_nid nid_key;
1656 struct obd_export *doomed_exp;
1657 int exports_evicted = 0;
1659 libcfs_strnid(&nid_key, nid);
1661 spin_lock(&obd->obd_dev_lock);
1662 /* umount has run already, so evict thread should leave
1663 * its task to umount thread now */
1664 if (obd->obd_stopping) {
1665 spin_unlock(&obd->obd_dev_lock);
1666 return exports_evicted;
1668 spin_unlock(&obd->obd_dev_lock);
1671 while (obd_nid_export_for_each(obd, &nid_key,
1672 take_first, &doomed_exp) > 0) {
1674 LASSERTF(doomed_exp != obd->obd_self_export,
1675 "self-export is hashed by NID?\n");
1677 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1679 obd_uuid2str(&doomed_exp->exp_client_uuid),
1680 obd_export_nid2str(doomed_exp));
1682 class_fail_export(doomed_exp);
1683 class_export_put(doomed_exp);
1688 if (!exports_evicted)
1690 "%s: can't disconnect NID '%s': no exports found\n",
1691 obd->obd_name, nid);
1692 return exports_evicted;
1694 EXPORT_SYMBOL(obd_export_evict_by_nid);
1696 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1698 struct obd_export *doomed_exp = NULL;
1699 struct obd_uuid doomed_uuid;
1700 int exports_evicted = 0;
1702 spin_lock(&obd->obd_dev_lock);
1703 if (obd->obd_stopping) {
1704 spin_unlock(&obd->obd_dev_lock);
1705 return exports_evicted;
1707 spin_unlock(&obd->obd_dev_lock);
1709 obd_str2uuid(&doomed_uuid, uuid);
1710 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1711 CERROR("%s: can't evict myself\n", obd->obd_name);
1712 return exports_evicted;
1715 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1716 if (doomed_exp == NULL) {
1717 CERROR("%s: can't disconnect %s: no exports found\n",
1718 obd->obd_name, uuid);
1720 CWARN("%s: evicting %s at adminstrative request\n",
1721 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1722 class_fail_export(doomed_exp);
1723 class_export_put(doomed_exp);
1724 obd_uuid_del(obd, doomed_exp);
1728 return exports_evicted;
1730 #endif /* HAVE_SERVER_SUPPORT */
1732 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1733 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1734 EXPORT_SYMBOL(class_export_dump_hook);
1737 static void print_export_data(struct obd_export *exp, const char *status,
1738 int locks, int debug_level)
1740 struct ptlrpc_reply_state *rs;
1741 struct ptlrpc_reply_state *first_reply = NULL;
1744 spin_lock(&exp->exp_lock);
1745 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1751 spin_unlock(&exp->exp_lock);
1753 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1754 "%p %s %llu stale:%d\n",
1755 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1756 obd_export_nid2str(exp),
1757 refcount_read(&exp->exp_handle.h_ref),
1758 atomic_read(&exp->exp_rpc_count),
1759 atomic_read(&exp->exp_cb_count),
1760 atomic_read(&exp->exp_locks_count),
1761 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1762 nreplies, first_reply, nreplies > 3 ? "..." : "",
1763 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1764 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1765 if (locks && class_export_dump_hook != NULL)
1766 class_export_dump_hook(exp);
1770 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1772 struct obd_export *exp;
1774 spin_lock(&obd->obd_dev_lock);
1775 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1776 print_export_data(exp, "ACTIVE", locks, debug_level);
1777 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1778 print_export_data(exp, "UNLINKED", locks, debug_level);
1779 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1780 print_export_data(exp, "DELAYED", locks, debug_level);
1781 spin_unlock(&obd->obd_dev_lock);
1784 void obd_exports_barrier(struct obd_device *obd)
1787 LASSERT(list_empty(&obd->obd_exports));
1788 spin_lock(&obd->obd_dev_lock);
1789 while (!list_empty(&obd->obd_unlinked_exports)) {
1790 spin_unlock(&obd->obd_dev_lock);
1791 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1792 if (waited > 5 && is_power_of_2(waited)) {
1793 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1794 "more than %d seconds. "
1795 "The obd refcount = %d. Is it stuck?\n",
1796 obd->obd_name, waited,
1797 atomic_read(&obd->obd_refcount));
1798 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1801 spin_lock(&obd->obd_dev_lock);
1803 spin_unlock(&obd->obd_dev_lock);
1805 EXPORT_SYMBOL(obd_exports_barrier);
1808 * Add export to the obd_zombe thread and notify it.
1810 static void obd_zombie_export_add(struct obd_export *exp) {
1811 atomic_inc(&obd_stale_export_num);
1812 spin_lock(&exp->exp_obd->obd_dev_lock);
1813 LASSERT(!list_empty(&exp->exp_obd_chain));
1814 list_del_init(&exp->exp_obd_chain);
1815 spin_unlock(&exp->exp_obd->obd_dev_lock);
1816 queue_work(zombie_wq, &exp->exp_zombie_work);
1820 * Add import to the obd_zombe thread and notify it.
1822 static void obd_zombie_import_add(struct obd_import *imp) {
1823 LASSERT(imp->imp_sec == NULL);
1825 queue_work(zombie_wq, &imp->imp_zombie_work);
1829 * wait when obd_zombie import/export queues become empty
1831 void obd_zombie_barrier(void)
1833 wait_var_event(&obd_stale_export_num,
1834 atomic_read(&obd_stale_export_num) == 0);
1835 flush_workqueue(zombie_wq);
1837 EXPORT_SYMBOL(obd_zombie_barrier);
1840 struct obd_export *obd_stale_export_get(void)
1842 struct obd_export *exp = NULL;
1845 spin_lock(&obd_stale_export_lock);
1846 if (!list_empty(&obd_stale_exports)) {
1847 exp = list_first_entry(&obd_stale_exports,
1848 struct obd_export, exp_stale_list);
1849 list_del_init(&exp->exp_stale_list);
1851 spin_unlock(&obd_stale_export_lock);
1854 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1855 atomic_read(&obd_stale_export_num));
1859 EXPORT_SYMBOL(obd_stale_export_get);
1861 void obd_stale_export_put(struct obd_export *exp)
1865 LASSERT(list_empty(&exp->exp_stale_list));
1866 if (exp->exp_lock_hash &&
1867 atomic_read(&exp->exp_lock_hash->hs_count)) {
1868 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1869 atomic_read(&obd_stale_export_num));
1871 spin_lock_bh(&exp->exp_bl_list_lock);
1872 spin_lock(&obd_stale_export_lock);
1873 /* Add to the tail if there is no blocked locks,
1874 * to the head otherwise. */
1875 if (list_empty(&exp->exp_bl_list))
1876 list_add_tail(&exp->exp_stale_list,
1877 &obd_stale_exports);
1879 list_add(&exp->exp_stale_list,
1880 &obd_stale_exports);
1882 spin_unlock(&obd_stale_export_lock);
1883 spin_unlock_bh(&exp->exp_bl_list_lock);
1885 class_export_put(exp);
1889 EXPORT_SYMBOL(obd_stale_export_put);
1892 * Adjust the position of the export in the stale list,
1893 * i.e. move to the head of the list if is needed.
1895 void obd_stale_export_adjust(struct obd_export *exp)
1897 LASSERT(exp != NULL);
1898 spin_lock_bh(&exp->exp_bl_list_lock);
1899 spin_lock(&obd_stale_export_lock);
1901 if (!list_empty(&exp->exp_stale_list) &&
1902 !list_empty(&exp->exp_bl_list))
1903 list_move(&exp->exp_stale_list, &obd_stale_exports);
1905 spin_unlock(&obd_stale_export_lock);
1906 spin_unlock_bh(&exp->exp_bl_list_lock);
1908 EXPORT_SYMBOL(obd_stale_export_adjust);
1911 * start destroy zombie import/export thread
1913 int obd_zombie_impexp_init(void)
1915 zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1917 cfs_cpt_number(cfs_cpt_tab));
1919 return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1923 * stop destroy zombie import/export thread
1925 void obd_zombie_impexp_stop(void)
1927 destroy_workqueue(zombie_wq);
1928 LASSERT(list_empty(&obd_stale_exports));
1931 /***** Kernel-userspace comm helpers *******/
1933 /* Get length of entire message, including header */
1934 int kuc_len(int payload_len)
1936 return sizeof(struct kuc_hdr) + payload_len;
1938 EXPORT_SYMBOL(kuc_len);
1940 /* Get a pointer to kuc header, given a ptr to the payload
1941 * @param p Pointer to payload area
1942 * @returns Pointer to kuc header
1944 struct kuc_hdr * kuc_ptr(void *p)
1946 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1947 LASSERT(lh->kuc_magic == KUC_MAGIC);
1950 EXPORT_SYMBOL(kuc_ptr);
1952 /* Alloc space for a message, and fill in header
1953 * @return Pointer to payload area
1955 void *kuc_alloc(int payload_len, int transport, int type)
1958 int len = kuc_len(payload_len);
1962 return ERR_PTR(-ENOMEM);
1964 lh->kuc_magic = KUC_MAGIC;
1965 lh->kuc_transport = transport;
1966 lh->kuc_msgtype = type;
1967 lh->kuc_msglen = len;
1969 return (void *)(lh + 1);
1971 EXPORT_SYMBOL(kuc_alloc);
1973 /* Takes pointer to payload area */
1974 void kuc_free(void *p, int payload_len)
1976 struct kuc_hdr *lh = kuc_ptr(p);
1977 OBD_FREE(lh, kuc_len(payload_len));
1979 EXPORT_SYMBOL(kuc_free);
1981 struct obd_request_slot_waiter {
1982 struct list_head orsw_entry;
1983 wait_queue_head_t orsw_waitq;
1987 static bool obd_request_slot_avail(struct client_obd *cli,
1988 struct obd_request_slot_waiter *orsw)
1992 spin_lock(&cli->cl_loi_list_lock);
1993 avail = !!list_empty(&orsw->orsw_entry);
1994 spin_unlock(&cli->cl_loi_list_lock);
2000 * For network flow control, the RPC sponsor needs to acquire a credit
2001 * before sending the RPC. The credits count for a connection is defined
2002 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2003 * the subsequent RPC sponsors need to wait until others released their
2004 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2006 int obd_get_request_slot(struct client_obd *cli)
2008 struct obd_request_slot_waiter orsw;
2011 spin_lock(&cli->cl_loi_list_lock);
2012 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2013 cli->cl_rpcs_in_flight++;
2014 spin_unlock(&cli->cl_loi_list_lock);
2018 init_waitqueue_head(&orsw.orsw_waitq);
2019 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2020 orsw.orsw_signaled = false;
2021 spin_unlock(&cli->cl_loi_list_lock);
2023 rc = l_wait_event_abortable(orsw.orsw_waitq,
2024 obd_request_slot_avail(cli, &orsw) ||
2025 orsw.orsw_signaled);
2027 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2028 * freed but other (such as obd_put_request_slot) is using it. */
2029 spin_lock(&cli->cl_loi_list_lock);
2031 if (!orsw.orsw_signaled) {
2032 if (list_empty(&orsw.orsw_entry))
2033 cli->cl_rpcs_in_flight--;
2035 list_del(&orsw.orsw_entry);
2040 if (orsw.orsw_signaled) {
2041 LASSERT(list_empty(&orsw.orsw_entry));
2045 spin_unlock(&cli->cl_loi_list_lock);
2049 EXPORT_SYMBOL(obd_get_request_slot);
2051 void obd_put_request_slot(struct client_obd *cli)
2053 struct obd_request_slot_waiter *orsw;
2055 spin_lock(&cli->cl_loi_list_lock);
2056 cli->cl_rpcs_in_flight--;
2058 /* If there is free slot, wakeup the first waiter. */
2059 if (!list_empty(&cli->cl_flight_waiters) &&
2060 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2061 orsw = list_first_entry(&cli->cl_flight_waiters,
2062 struct obd_request_slot_waiter,
2064 list_del_init(&orsw->orsw_entry);
2065 cli->cl_rpcs_in_flight++;
2066 wake_up(&orsw->orsw_waitq);
2068 spin_unlock(&cli->cl_loi_list_lock);
2070 EXPORT_SYMBOL(obd_put_request_slot);
2072 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2074 return cli->cl_max_rpcs_in_flight;
2076 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2078 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2080 struct obd_request_slot_waiter *orsw;
2086 if (max > OBD_MAX_RIF_MAX || max < 1)
2089 CDEBUG(D_INFO, "%s: max = %u max_mod = %u rif = %u\n",
2090 cli->cl_import->imp_obd->obd_name, max,
2091 cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2093 if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2094 LUSTRE_MDC_NAME) == 0) {
2095 /* adjust max_mod_rpcs_in_flight to ensure it is always
2096 * strictly lower that max_rpcs_in_flight */
2098 CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2099 cli->cl_import->imp_obd->obd_name);
2102 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2103 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2109 spin_lock(&cli->cl_loi_list_lock);
2110 old = cli->cl_max_rpcs_in_flight;
2111 cli->cl_max_rpcs_in_flight = max;
2112 client_adjust_max_dirty(cli);
2116 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2117 for (i = 0; i < diff; i++) {
2118 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2119 struct obd_request_slot_waiter,
2124 list_del_init(&orsw->orsw_entry);
2125 cli->cl_rpcs_in_flight++;
2126 wake_up(&orsw->orsw_waitq);
2128 spin_unlock(&cli->cl_loi_list_lock);
2132 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2134 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2136 return cli->cl_max_mod_rpcs_in_flight;
2138 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2140 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2142 struct obd_connect_data *ocd;
2146 if (max > OBD_MAX_RIF_MAX || max < 1)
2149 ocd = &cli->cl_import->imp_connect_data;
2150 CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2151 cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2152 ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2154 if (max == OBD_MAX_RIF_MAX)
2155 max = OBD_MAX_RIF_MAX - 1;
2157 /* Cannot exceed or equal max_rpcs_in_flight. If we are asked to
2158 * increase this value, also bump up max_rpcs_in_flight to match.
2160 if (max >= cli->cl_max_rpcs_in_flight) {
2162 "%s: increasing max_rpcs_in_flight=%u to allow larger max_mod_rpcs_in_flight=%u\n",
2163 cli->cl_import->imp_obd->obd_name, max + 1, max);
2164 obd_set_max_rpcs_in_flight(cli, max + 1);
2167 /* cannot exceed max modify RPCs in flight supported by the server,
2168 * but verify ocd_connect_flags is at least initialized first. If
2169 * not, allow it and fix value later in ptlrpc_connect_set_flags().
2171 if (!ocd->ocd_connect_flags) {
2172 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2173 } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2174 maxmodrpcs = ocd->ocd_maxmodrpcs;
2175 if (maxmodrpcs == 0) { /* connection not finished yet */
2176 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2178 "%s: partial connect, assume maxmodrpcs=%hu\n",
2179 cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2184 if (max > maxmodrpcs) {
2185 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than mdt.*.max_mod_rpcs_in_flight=%hu returned by the MDT server at connection.\n",
2186 cli->cl_import->imp_obd->obd_name,
2191 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2193 prev = cli->cl_max_mod_rpcs_in_flight;
2194 cli->cl_max_mod_rpcs_in_flight = max;
2196 /* wakeup waiters if limit has been increased */
2197 if (cli->cl_max_mod_rpcs_in_flight > prev)
2198 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2200 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2204 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2206 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2207 struct seq_file *seq)
2209 unsigned long mod_tot = 0, mod_cum;
2212 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2213 lprocfs_stats_header(seq, ktime_get_real(), cli->cl_mod_rpcs_init, 25,
2215 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2216 cli->cl_mod_rpcs_in_flight);
2218 seq_printf(seq, "\n\t\t\tmodify\n");
2219 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2221 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2224 for (i = 0; i < OBD_HIST_MAX; i++) {
2225 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2228 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2229 i, mod, pct(mod, mod_tot),
2230 pct(mod_cum, mod_tot));
2231 if (mod_cum == mod_tot)
2235 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2239 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2241 /* The number of modify RPCs sent in parallel is limited
2242 * because the server has a finite number of slots per client to
2243 * store request result and ensure reply reconstruction when needed.
2244 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2245 * that takes into account server limit and cl_max_rpcs_in_flight
2247 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2248 * one close request is allowed above the maximum.
2251 struct client_obd *cli;
2254 wait_queue_entry_t wqe;
2256 static int claim_mod_rpc_function(wait_queue_entry_t *wq_entry,
2257 unsigned int mode, int flags, void *key)
2259 struct mod_waiter *w = container_of(wq_entry, struct mod_waiter, wqe);
2260 struct client_obd *cli = w->cli;
2261 bool close_req = w->close_req;
2265 /* As woken_wake_function() doesn't remove us from the wait_queue,
2266 * we use own flag to ensure we're called just once.
2271 /* A slot is available if
2272 * - number of modify RPCs in flight is less than the max
2273 * - it's a close RPC and no other close request is in flight
2275 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2276 (close_req && cli->cl_close_rpcs_in_flight == 0);
2278 cli->cl_mod_rpcs_in_flight++;
2280 cli->cl_close_rpcs_in_flight++;
2281 ret = woken_wake_function(wq_entry, mode, flags, key);
2283 } else if (cli->cl_close_rpcs_in_flight)
2284 /* No other waiter could be woken */
2286 else if (key == NULL)
2287 /* This was not a wakeup from a close completion, so there is no
2288 * point seeing if there are close waiters to be woken
2292 /* There might be be a close we could wake, keep looking */
2297 /* Get a modify RPC slot from the obd client @cli according
2298 * to the kind of operation @opc that is going to be sent
2299 * and the intent @it of the operation if it applies.
2300 * If the maximum number of modify RPCs in flight is reached
2301 * the thread is put to sleep.
2302 * Returns the tag to be set in the request message. Tag 0
2303 * is reserved for non-modifying requests.
2305 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2307 struct mod_waiter wait = {
2309 .close_req = (opc == MDS_CLOSE),
2314 init_wait(&wait.wqe);
2315 wait.wqe.func = claim_mod_rpc_function;
2317 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2318 __add_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2319 /* This wakeup will only succeed if the maximums haven't
2320 * been reached. If that happens, WQ_FLAG_WOKEN will be cleared
2321 * and there will be no need to wait.
2323 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2324 /* XXX: handle spurious wakeups (from unknown yet source */
2325 while (wait.woken == false) {
2326 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2327 wait_woken(&wait.wqe, TASK_UNINTERRUPTIBLE,
2328 MAX_SCHEDULE_TIMEOUT);
2329 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2331 __remove_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2333 max = cli->cl_max_mod_rpcs_in_flight;
2334 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2335 cli->cl_mod_rpcs_in_flight);
2336 /* find a free tag */
2337 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2339 LASSERT(i < OBD_MAX_RIF_MAX);
2340 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2341 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2342 /* tag 0 is reserved for non-modify RPCs */
2345 "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2346 cli->cl_import->imp_obd->obd_name,
2351 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2353 /* Put a modify RPC slot from the obd client @cli according
2354 * to the kind of operation @opc that has been sent.
2356 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2358 bool close_req = false;
2363 if (opc == MDS_CLOSE)
2366 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2367 cli->cl_mod_rpcs_in_flight--;
2369 cli->cl_close_rpcs_in_flight--;
2370 /* release the tag in the bitmap */
2371 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2372 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2373 __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2375 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2377 EXPORT_SYMBOL(obd_put_mod_rpc_slot);