4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/obdclass/genops.c
33 * These are the only exported functions, they provide some generic
34 * infrastructure for managing object devices
37 #define DEBUG_SUBSYSTEM S_CLASS
39 #include <linux/pid_namespace.h>
40 #include <linux/workqueue.h>
41 #include <lustre_compat.h>
42 #include <obd_class.h>
43 #include <lustre_log.h>
44 #include <lprocfs_status.h>
45 #include <lustre_disk.h>
46 #include <lustre_kernelcomm.h>
48 DEFINE_RWLOCK(obd_dev_lock);
49 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
51 static struct kmem_cache *obd_device_cachep;
52 static struct kobj_type class_ktype;
53 static struct workqueue_struct *zombie_wq;
55 static void obd_zombie_export_add(struct obd_export *exp);
56 static void obd_zombie_import_add(struct obd_import *imp);
57 static void print_export_data(struct obd_export *exp,
58 const char *status, int locks, int debug_level);
60 static LIST_HEAD(obd_stale_exports);
61 static DEFINE_SPINLOCK(obd_stale_export_lock);
62 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
65 * support functions: we could use inter-module communication, but this
66 * is more portable to other OS's
68 static struct obd_device *obd_device_alloc(void)
70 struct obd_device *obd;
72 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
74 obd->obd_magic = OBD_DEVICE_MAGIC;
79 static void obd_device_free(struct obd_device *obd)
82 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
83 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
84 if (obd->obd_namespace != NULL) {
85 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
86 obd, obd->obd_namespace, obd->obd_force);
89 lu_ref_fini(&obd->obd_reference);
90 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
93 struct obd_type *class_search_type(const char *name)
95 struct kobject *kobj = kset_find_obj(lustre_kset, name);
97 if (kobj && kobj->ktype == &class_ktype)
98 return container_of(kobj, struct obd_type, typ_kobj);
103 EXPORT_SYMBOL(class_search_type);
105 struct obd_type *class_get_type(const char *name)
107 struct obd_type *type;
109 type = class_search_type(name);
110 #ifdef HAVE_MODULE_LOADING_SUPPORT
112 const char *modname = name;
114 #ifdef HAVE_SERVER_SUPPORT
115 if (strcmp(modname, "obdfilter") == 0)
118 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
119 modname = LUSTRE_OSP_NAME;
121 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
122 modname = LUSTRE_MDT_NAME;
123 #endif /* HAVE_SERVER_SUPPORT */
125 if (!request_module("%s", modname)) {
126 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127 type = class_search_type(name);
129 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
135 if (try_module_get(type->typ_dt_ops->o_owner)) {
136 atomic_inc(&type->typ_refcnt);
137 /* class_search_type() returned a counted reference,
138 * but we don't need that count any more as
139 * we have one through typ_refcnt.
141 kobject_put(&type->typ_kobj);
143 kobject_put(&type->typ_kobj);
149 EXPORT_SYMBOL(class_get_type);
151 void class_put_type(struct obd_type *type)
154 module_put(type->typ_dt_ops->o_owner);
155 atomic_dec(&type->typ_refcnt);
157 EXPORT_SYMBOL(class_put_type);
159 static void class_sysfs_release(struct kobject *kobj)
161 struct obd_type *type = container_of(kobj, struct obd_type, typ_kobj);
163 debugfs_remove_recursive(type->typ_debugfs_entry);
164 type->typ_debugfs_entry = NULL;
167 lu_device_type_fini(type->typ_lu);
169 #ifdef CONFIG_PROC_FS
170 if (type->typ_name && type->typ_procroot)
171 remove_proc_subtree(type->typ_name, proc_lustre_root);
173 OBD_FREE(type, sizeof(*type));
176 static struct kobj_type class_ktype = {
177 .sysfs_ops = &lustre_sysfs_ops,
178 .release = class_sysfs_release,
181 #ifdef HAVE_SERVER_SUPPORT
182 struct obd_type *class_add_symlinks(const char *name, bool enable_proc)
184 struct dentry *symlink;
185 struct obd_type *type;
188 type = class_search_type(name);
190 kobject_put(&type->typ_kobj);
191 return ERR_PTR(-EEXIST);
194 OBD_ALLOC(type, sizeof(*type));
196 return ERR_PTR(-ENOMEM);
198 type->typ_kobj.kset = lustre_kset;
199 rc = kobject_init_and_add(&type->typ_kobj, &class_ktype,
200 &lustre_kset->kobj, "%s", name);
204 symlink = debugfs_create_dir(name, debugfs_lustre_root);
205 type->typ_debugfs_entry = symlink;
206 type->typ_sym_filter = true;
209 type->typ_procroot = lprocfs_register(name, proc_lustre_root,
211 if (IS_ERR(type->typ_procroot)) {
212 CERROR("%s: can't create compat proc entry: %d\n",
213 name, (int)PTR_ERR(type->typ_procroot));
214 type->typ_procroot = NULL;
220 EXPORT_SYMBOL(class_add_symlinks);
221 #endif /* HAVE_SERVER_SUPPORT */
223 #define CLASS_MAX_NAME 1024
225 int class_register_type(const struct obd_ops *dt_ops,
226 const struct md_ops *md_ops,
228 const char *name, struct lu_device_type *ldt)
230 struct obd_type *type;
235 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
237 type = class_search_type(name);
239 #ifdef HAVE_SERVER_SUPPORT
240 if (type->typ_sym_filter)
242 #endif /* HAVE_SERVER_SUPPORT */
243 kobject_put(&type->typ_kobj);
244 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
248 OBD_ALLOC(type, sizeof(*type));
252 type->typ_lu = ldt ? OBD_LU_TYPE_SETUP : NULL;
253 type->typ_kobj.kset = lustre_kset;
254 kobject_init(&type->typ_kobj, &class_ktype);
255 #ifdef HAVE_SERVER_SUPPORT
257 #endif /* HAVE_SERVER_SUPPORT */
259 type->typ_dt_ops = dt_ops;
260 type->typ_md_ops = md_ops;
262 #ifdef HAVE_SERVER_SUPPORT
263 if (type->typ_sym_filter) {
264 type->typ_sym_filter = false;
265 kobject_put(&type->typ_kobj);
269 #ifdef CONFIG_PROC_FS
270 if (enable_proc && !type->typ_procroot) {
271 type->typ_procroot = lprocfs_register(name,
274 if (IS_ERR(type->typ_procroot)) {
275 rc = PTR_ERR(type->typ_procroot);
276 type->typ_procroot = NULL;
281 type->typ_debugfs_entry = debugfs_create_dir(name, debugfs_lustre_root);
283 rc = kobject_add(&type->typ_kobj, &lustre_kset->kobj, "%s", name);
286 #ifdef HAVE_SERVER_SUPPORT
290 rc = lu_device_type_init(ldt);
291 smp_store_release(&type->typ_lu, rc ? NULL : ldt);
292 wake_up_var(&type->typ_lu);
300 kobject_put(&type->typ_kobj);
304 EXPORT_SYMBOL(class_register_type);
306 int class_unregister_type(const char *name)
308 struct obd_type *type = class_search_type(name);
313 CERROR("unknown obd type\n");
317 if (atomic_read(&type->typ_refcnt)) {
318 CERROR("type %s has refcount (%d)\n", name,
319 atomic_read(&type->typ_refcnt));
320 /* This is a bad situation, let's make the best of it */
321 /* Remove ops, but leave the name for debugging */
322 type->typ_dt_ops = NULL;
323 type->typ_md_ops = NULL;
324 GOTO(out_put, rc = -EBUSY);
327 /* Put the final ref */
328 kobject_put(&type->typ_kobj);
330 /* Put the ref returned by class_search_type() */
331 kobject_put(&type->typ_kobj);
334 } /* class_unregister_type */
335 EXPORT_SYMBOL(class_unregister_type);
338 * Create a new obd device.
340 * Allocate the new obd_device and initialize it.
342 * \param[in] type_name obd device type string.
343 * \param[in] name obd device name.
344 * \param[in] uuid obd device UUID
346 * \retval newdev pointer to created obd_device
347 * \retval ERR_PTR(errno) on error
349 struct obd_device *class_newdev(const char *type_name, const char *name,
352 struct obd_device *newdev;
353 struct obd_type *type = NULL;
356 if (strlen(name) >= MAX_OBD_NAME) {
357 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
358 RETURN(ERR_PTR(-EINVAL));
361 type = class_get_type(type_name);
363 CERROR("OBD: unknown type: %s\n", type_name);
364 RETURN(ERR_PTR(-ENODEV));
367 newdev = obd_device_alloc();
368 if (newdev == NULL) {
369 class_put_type(type);
370 RETURN(ERR_PTR(-ENOMEM));
372 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
373 strncpy(newdev->obd_name, name, sizeof(newdev->obd_name) - 1);
374 newdev->obd_type = type;
375 newdev->obd_minor = -1;
377 rwlock_init(&newdev->obd_pool_lock);
378 newdev->obd_pool_limit = 0;
379 newdev->obd_pool_slv = 0;
381 INIT_LIST_HEAD(&newdev->obd_exports);
382 newdev->obd_num_exports = 0;
383 newdev->obd_grant_check_threshold = 100;
384 INIT_LIST_HEAD(&newdev->obd_unlinked_exports);
385 INIT_LIST_HEAD(&newdev->obd_delayed_exports);
386 INIT_LIST_HEAD(&newdev->obd_exports_timed);
387 INIT_LIST_HEAD(&newdev->obd_nid_stats);
388 spin_lock_init(&newdev->obd_nid_lock);
389 spin_lock_init(&newdev->obd_dev_lock);
390 mutex_init(&newdev->obd_dev_mutex);
391 spin_lock_init(&newdev->obd_osfs_lock);
392 /* newdev->obd_osfs_age must be set to a value in the distant
393 * past to guarantee a fresh statfs is fetched on mount. */
394 newdev->obd_osfs_age = ktime_get_seconds() - 1000;
396 /* XXX belongs in setup not attach */
397 init_rwsem(&newdev->obd_observer_link_sem);
399 spin_lock_init(&newdev->obd_recovery_task_lock);
400 init_waitqueue_head(&newdev->obd_next_transno_waitq);
401 init_waitqueue_head(&newdev->obd_evict_inprogress_waitq);
402 INIT_LIST_HEAD(&newdev->obd_req_replay_queue);
403 INIT_LIST_HEAD(&newdev->obd_lock_replay_queue);
404 INIT_LIST_HEAD(&newdev->obd_final_req_queue);
405 INIT_LIST_HEAD(&newdev->obd_evict_list);
406 INIT_LIST_HEAD(&newdev->obd_lwp_list);
408 llog_group_init(&newdev->obd_olg);
409 /* Detach drops this */
410 atomic_set(&newdev->obd_refcount, 1);
411 lu_ref_init(&newdev->obd_reference);
412 lu_ref_add(&newdev->obd_reference, "newdev", newdev);
414 newdev->obd_conn_inprogress = 0;
416 strncpy(newdev->obd_uuid.uuid, uuid, UUID_MAX);
418 CDEBUG(D_IOCTL, "Allocate new device %s (%p)\n",
419 newdev->obd_name, newdev);
427 * \param[in] obd obd_device to be freed
431 void class_free_dev(struct obd_device *obd)
433 struct obd_type *obd_type = obd->obd_type;
435 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x "
436 "!= %08x\n", obd, obd->obd_magic, OBD_DEVICE_MAGIC);
437 LASSERTF(obd->obd_minor == -1 || obd_devs[obd->obd_minor] == obd,
438 "obd %p != obd_devs[%d] %p\n",
439 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
440 LASSERTF(atomic_read(&obd->obd_refcount) == 0,
441 "obd_refcount should be 0, not %d\n",
442 atomic_read(&obd->obd_refcount));
443 LASSERT(obd_type != NULL);
445 CDEBUG(D_INFO, "Release obd device %s obd_type name = %s\n",
446 obd->obd_name, obd->obd_type->typ_name);
448 CDEBUG(D_CONFIG, "finishing cleanup of obd %s (%s)\n",
449 obd->obd_name, obd->obd_uuid.uuid);
450 if (obd->obd_stopping) {
453 /* If we're not stopping, we were never set up */
454 err = obd_cleanup(obd);
456 CERROR("Cleanup %s returned %d\n",
460 obd_device_free(obd);
462 class_put_type(obd_type);
466 * Unregister obd device.
468 * Free slot in obd_dev[] used by \a obd.
470 * \param[in] new_obd obd_device to be unregistered
474 void class_unregister_device(struct obd_device *obd)
476 write_lock(&obd_dev_lock);
477 if (obd->obd_minor >= 0) {
478 LASSERT(obd_devs[obd->obd_minor] == obd);
479 obd_devs[obd->obd_minor] = NULL;
482 write_unlock(&obd_dev_lock);
486 * Register obd device.
488 * Find free slot in obd_devs[], fills it with \a new_obd.
490 * \param[in] new_obd obd_device to be registered
493 * \retval -EEXIST device with this name is registered
494 * \retval -EOVERFLOW obd_devs[] is full
496 int class_register_device(struct obd_device *new_obd)
500 int new_obd_minor = 0;
501 bool minor_assign = false;
502 bool retried = false;
505 write_lock(&obd_dev_lock);
506 for (i = 0; i < class_devno_max(); i++) {
507 struct obd_device *obd = class_num2obd(i);
510 (strcmp(new_obd->obd_name, obd->obd_name) == 0)) {
513 write_unlock(&obd_dev_lock);
515 /* the obd_device could be waited to be
516 * destroyed by the "obd_zombie_impexp_thread".
518 obd_zombie_barrier();
523 CERROR("%s: already exists, won't add\n",
525 /* in case we found a free slot before duplicate */
526 minor_assign = false;
530 if (!minor_assign && obd == NULL) {
537 new_obd->obd_minor = new_obd_minor;
538 LASSERTF(obd_devs[new_obd_minor] == NULL, "obd_devs[%d] "
539 "%p\n", new_obd_minor, obd_devs[new_obd_minor]);
540 obd_devs[new_obd_minor] = new_obd;
544 CERROR("%s: all %u/%u devices used, increase "
545 "MAX_OBD_DEVICES: rc = %d\n", new_obd->obd_name,
546 i, class_devno_max(), ret);
549 write_unlock(&obd_dev_lock);
554 static int class_name2dev_nolock(const char *name)
561 for (i = 0; i < class_devno_max(); i++) {
562 struct obd_device *obd = class_num2obd(i);
564 if (obd && strcmp(name, obd->obd_name) == 0) {
565 /* Make sure we finished attaching before we give
566 out any references */
567 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
568 if (obd->obd_attached) {
578 int class_name2dev(const char *name)
585 read_lock(&obd_dev_lock);
586 i = class_name2dev_nolock(name);
587 read_unlock(&obd_dev_lock);
591 EXPORT_SYMBOL(class_name2dev);
593 struct obd_device *class_name2obd(const char *name)
595 int dev = class_name2dev(name);
597 if (dev < 0 || dev > class_devno_max())
599 return class_num2obd(dev);
601 EXPORT_SYMBOL(class_name2obd);
603 static int class_uuid2dev_nolock(struct obd_uuid *uuid)
607 for (i = 0; i < class_devno_max(); i++) {
608 struct obd_device *obd = class_num2obd(i);
610 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
611 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
619 int class_uuid2dev(struct obd_uuid *uuid)
623 read_lock(&obd_dev_lock);
624 i = class_uuid2dev_nolock(uuid);
625 read_unlock(&obd_dev_lock);
629 EXPORT_SYMBOL(class_uuid2dev);
631 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
633 int dev = class_uuid2dev(uuid);
636 return class_num2obd(dev);
638 EXPORT_SYMBOL(class_uuid2obd);
641 * Get obd device from ::obd_devs[]
643 * \param num [in] array index
645 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
646 * otherwise return the obd device there.
648 struct obd_device *class_num2obd(int num)
650 struct obd_device *obd = NULL;
652 if (num < class_devno_max()) {
657 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
658 "%p obd_magic %08x != %08x\n",
659 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
660 LASSERTF(obd->obd_minor == num,
661 "%p obd_minor %0d != %0d\n",
662 obd, obd->obd_minor, num);
667 EXPORT_SYMBOL(class_num2obd);
670 * Find obd in obd_dev[] by name or uuid.
672 * Increment obd's refcount if found.
674 * \param[in] str obd name or uuid
676 * \retval NULL if not found
677 * \retval target pointer to found obd_device
679 struct obd_device *class_dev_by_str(const char *str)
681 struct obd_device *target = NULL;
682 struct obd_uuid tgtuuid;
685 obd_str2uuid(&tgtuuid, str);
687 read_lock(&obd_dev_lock);
688 rc = class_uuid2dev_nolock(&tgtuuid);
690 rc = class_name2dev_nolock(str);
693 target = class_num2obd(rc);
696 class_incref(target, "find", current);
697 read_unlock(&obd_dev_lock);
701 EXPORT_SYMBOL(class_dev_by_str);
704 * Get obd devices count. Device in any
706 * \retval obd device count
708 int get_devices_count(void)
710 int index, max_index = class_devno_max(), dev_count = 0;
712 read_lock(&obd_dev_lock);
713 for (index = 0; index <= max_index; index++) {
714 struct obd_device *obd = class_num2obd(index);
718 read_unlock(&obd_dev_lock);
722 EXPORT_SYMBOL(get_devices_count);
724 void class_obd_list(void)
729 read_lock(&obd_dev_lock);
730 for (i = 0; i < class_devno_max(); i++) {
731 struct obd_device *obd = class_num2obd(i);
735 if (obd->obd_stopping)
737 else if (obd->obd_set_up)
739 else if (obd->obd_attached)
743 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
744 i, status, obd->obd_type->typ_name,
745 obd->obd_name, obd->obd_uuid.uuid,
746 atomic_read(&obd->obd_refcount));
748 read_unlock(&obd_dev_lock);
751 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
752 * specified, then only the client with that uuid is returned,
753 * otherwise any client connected to the tgt is returned.
755 struct obd_device *class_find_client_obd(struct obd_uuid *tgt_uuid,
756 const char *type_name,
757 struct obd_uuid *grp_uuid)
761 read_lock(&obd_dev_lock);
762 for (i = 0; i < class_devno_max(); i++) {
763 struct obd_device *obd = class_num2obd(i);
767 if ((strncmp(obd->obd_type->typ_name, type_name,
768 strlen(type_name)) == 0)) {
769 if (obd_uuid_equals(tgt_uuid,
770 &obd->u.cli.cl_target_uuid) &&
771 ((grp_uuid)? obd_uuid_equals(grp_uuid,
772 &obd->obd_uuid) : 1)) {
773 read_unlock(&obd_dev_lock);
778 read_unlock(&obd_dev_lock);
782 EXPORT_SYMBOL(class_find_client_obd);
784 /* Iterate the obd_device list looking devices have grp_uuid. Start
785 * searching at *next, and if a device is found, the next index to look
786 * at is saved in *next. If next is NULL, then the first matching device
787 * will always be returned.
789 struct obd_device *class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
795 else if (*next >= 0 && *next < class_devno_max())
800 read_lock(&obd_dev_lock);
801 for (; i < class_devno_max(); i++) {
802 struct obd_device *obd = class_num2obd(i);
806 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
809 read_unlock(&obd_dev_lock);
813 read_unlock(&obd_dev_lock);
817 EXPORT_SYMBOL(class_devices_in_group);
820 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
821 * adjust sptlrpc settings accordingly.
823 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
825 struct obd_device *obd;
829 LASSERT(namelen > 0);
831 read_lock(&obd_dev_lock);
832 for (i = 0; i < class_devno_max(); i++) {
833 obd = class_num2obd(i);
835 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
838 /* only notify mdc, osc, osp, lwp, mdt, ost
839 * because only these have a -sptlrpc llog */
840 type = obd->obd_type->typ_name;
841 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
842 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
843 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
844 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
845 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
846 strcmp(type, LUSTRE_OST_NAME) != 0)
849 if (strncmp(obd->obd_name, fsname, namelen))
852 class_incref(obd, __FUNCTION__, obd);
853 read_unlock(&obd_dev_lock);
854 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
855 sizeof(KEY_SPTLRPC_CONF),
856 KEY_SPTLRPC_CONF, 0, NULL, NULL);
858 class_decref(obd, __FUNCTION__, obd);
859 read_lock(&obd_dev_lock);
861 read_unlock(&obd_dev_lock);
864 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
866 void obd_cleanup_caches(void)
869 if (obd_device_cachep) {
870 kmem_cache_destroy(obd_device_cachep);
871 obd_device_cachep = NULL;
877 int obd_init_caches(void)
882 LASSERT(obd_device_cachep == NULL);
883 obd_device_cachep = kmem_cache_create_usercopy("ll_obd_dev_cache",
884 sizeof(struct obd_device),
885 0, 0, 0, sizeof(struct obd_device), NULL);
886 if (!obd_device_cachep)
887 GOTO(out, rc = -ENOMEM);
891 obd_cleanup_caches();
895 static const char export_handle_owner[] = "export";
897 /* map connection to client */
898 struct obd_export *class_conn2export(struct lustre_handle *conn)
900 struct obd_export *export;
904 CDEBUG(D_CACHE, "looking for null handle\n");
908 if (conn->cookie == -1) { /* this means assign a new connection */
909 CDEBUG(D_CACHE, "want a new connection\n");
913 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
914 export = class_handle2object(conn->cookie, export_handle_owner);
917 EXPORT_SYMBOL(class_conn2export);
919 struct obd_device *class_exp2obd(struct obd_export *exp)
925 EXPORT_SYMBOL(class_exp2obd);
927 struct obd_import *class_exp2cliimp(struct obd_export *exp)
929 struct obd_device *obd = exp->exp_obd;
932 return obd->u.cli.cl_import;
934 EXPORT_SYMBOL(class_exp2cliimp);
936 /* Export management functions */
937 static void class_export_destroy(struct obd_export *exp)
939 struct obd_device *obd = exp->exp_obd;
942 LASSERT(refcount_read(&exp->exp_handle.h_ref) == 0);
943 LASSERT(obd != NULL);
945 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
946 exp->exp_client_uuid.uuid, obd->obd_name);
948 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
949 ptlrpc_connection_put(exp->exp_connection);
951 LASSERT(list_empty(&exp->exp_outstanding_replies));
952 LASSERT(list_empty(&exp->exp_uncommitted_replies));
953 LASSERT(list_empty(&exp->exp_req_replay_queue));
954 LASSERT(list_empty(&exp->exp_hp_rpcs));
955 obd_destroy_export(exp);
956 /* self export doesn't hold a reference to an obd, although it
957 * exists until freeing of the obd */
958 if (exp != obd->obd_self_export)
959 class_decref(obd, "export", exp);
961 OBD_FREE_PRE(exp, sizeof(*exp), "kfree_rcu");
962 kfree_rcu(exp, exp_handle.h_rcu);
966 struct obd_export *class_export_get(struct obd_export *exp)
968 refcount_inc(&exp->exp_handle.h_ref);
969 CDEBUG(D_INFO, "GET export %p refcount=%d\n", exp,
970 refcount_read(&exp->exp_handle.h_ref));
973 EXPORT_SYMBOL(class_export_get);
975 void class_export_put(struct obd_export *exp)
977 LASSERT(exp != NULL);
978 LASSERT(refcount_read(&exp->exp_handle.h_ref) > 0);
979 LASSERT(refcount_read(&exp->exp_handle.h_ref) < LI_POISON);
980 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
981 refcount_read(&exp->exp_handle.h_ref) - 1);
983 if (refcount_dec_and_test(&exp->exp_handle.h_ref)) {
984 struct obd_device *obd = exp->exp_obd;
986 CDEBUG(D_IOCTL, "final put %p/%s\n",
987 exp, exp->exp_client_uuid.uuid);
989 /* release nid stat refererence */
990 lprocfs_exp_cleanup(exp);
992 if (exp == obd->obd_self_export) {
993 /* self export should be destroyed without
994 * zombie thread as it doesn't hold a
995 * reference to obd and doesn't hold any
997 class_export_destroy(exp);
998 /* self export is destroyed, no class
999 * references exist and it is safe to free
1001 class_free_dev(obd);
1003 LASSERT(!list_empty(&exp->exp_obd_chain));
1004 obd_zombie_export_add(exp);
1009 EXPORT_SYMBOL(class_export_put);
1011 static void obd_zombie_exp_cull(struct work_struct *ws)
1013 struct obd_export *export;
1015 export = container_of(ws, struct obd_export, exp_zombie_work);
1016 class_export_destroy(export);
1017 LASSERT(atomic_read(&obd_stale_export_num) > 0);
1018 if (atomic_dec_and_test(&obd_stale_export_num))
1019 wake_up_var(&obd_stale_export_num);
1022 /* Creates a new export, adds it to the hash table, and returns a
1023 * pointer to it. The refcount is 2: one for the hash reference, and
1024 * one for the pointer returned by this function. */
1025 static struct obd_export *__class_new_export(struct obd_device *obd,
1026 struct obd_uuid *cluuid,
1029 struct obd_export *export;
1033 OBD_ALLOC_PTR(export);
1035 return ERR_PTR(-ENOMEM);
1037 export->exp_conn_cnt = 0;
1038 export->exp_lock_hash = NULL;
1039 export->exp_flock_hash = NULL;
1040 /* 2 = class_handle_hash + last */
1041 refcount_set(&export->exp_handle.h_ref, 2);
1042 atomic_set(&export->exp_rpc_count, 0);
1043 atomic_set(&export->exp_cb_count, 0);
1044 atomic_set(&export->exp_locks_count, 0);
1045 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1046 INIT_LIST_HEAD(&export->exp_locks_list);
1047 spin_lock_init(&export->exp_locks_list_guard);
1049 atomic_set(&export->exp_replay_count, 0);
1050 export->exp_obd = obd;
1051 INIT_LIST_HEAD(&export->exp_outstanding_replies);
1052 spin_lock_init(&export->exp_uncommitted_replies_lock);
1053 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
1054 INIT_LIST_HEAD(&export->exp_req_replay_queue);
1055 INIT_HLIST_NODE(&export->exp_handle.h_link);
1056 INIT_LIST_HEAD(&export->exp_hp_rpcs);
1057 INIT_LIST_HEAD(&export->exp_reg_rpcs);
1058 class_handle_hash(&export->exp_handle, export_handle_owner);
1059 export->exp_last_request_time = ktime_get_real_seconds();
1060 spin_lock_init(&export->exp_lock);
1061 spin_lock_init(&export->exp_rpc_lock);
1062 INIT_HLIST_NODE(&export->exp_gen_hash);
1063 spin_lock_init(&export->exp_bl_list_lock);
1064 INIT_LIST_HEAD(&export->exp_bl_list);
1065 INIT_LIST_HEAD(&export->exp_stale_list);
1066 INIT_WORK(&export->exp_zombie_work, obd_zombie_exp_cull);
1068 export->exp_sp_peer = LUSTRE_SP_ANY;
1069 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
1070 export->exp_client_uuid = *cluuid;
1071 obd_init_export(export);
1073 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
1074 export->exp_root_fid.f_seq = 0;
1075 export->exp_root_fid.f_oid = 0;
1076 export->exp_root_fid.f_ver = 0;
1078 spin_lock(&obd->obd_dev_lock);
1079 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
1080 /* shouldn't happen, but might race */
1081 if (obd->obd_stopping)
1082 GOTO(exit_unlock, rc = -ENODEV);
1084 rc = obd_uuid_add(obd, export);
1086 LCONSOLE_WARN("%s: denying duplicate export for %s: rc = %d\n",
1087 obd->obd_name, cluuid->uuid, rc);
1088 GOTO(exit_unlock, rc = -EALREADY);
1093 class_incref(obd, "export", export);
1094 list_add_tail(&export->exp_obd_chain_timed,
1095 &obd->obd_exports_timed);
1096 list_add(&export->exp_obd_chain, &obd->obd_exports);
1097 obd->obd_num_exports++;
1099 INIT_LIST_HEAD(&export->exp_obd_chain_timed);
1100 INIT_LIST_HEAD(&export->exp_obd_chain);
1102 spin_unlock(&obd->obd_dev_lock);
1106 spin_unlock(&obd->obd_dev_lock);
1107 class_handle_unhash(&export->exp_handle);
1108 obd_destroy_export(export);
1109 OBD_FREE_PTR(export);
1113 struct obd_export *class_new_export(struct obd_device *obd,
1114 struct obd_uuid *uuid)
1116 return __class_new_export(obd, uuid, false);
1118 EXPORT_SYMBOL(class_new_export);
1120 struct obd_export *class_new_export_self(struct obd_device *obd,
1121 struct obd_uuid *uuid)
1123 return __class_new_export(obd, uuid, true);
1126 void class_unlink_export(struct obd_export *exp)
1128 class_handle_unhash(&exp->exp_handle);
1130 if (exp->exp_obd->obd_self_export == exp) {
1131 class_export_put(exp);
1135 spin_lock(&exp->exp_obd->obd_dev_lock);
1136 /* delete an uuid-export hashitem from hashtables */
1137 if (exp != exp->exp_obd->obd_self_export)
1138 obd_uuid_del(exp->exp_obd, exp);
1140 #ifdef HAVE_SERVER_SUPPORT
1141 if (!hlist_unhashed(&exp->exp_gen_hash)) {
1142 struct tg_export_data *ted = &exp->exp_target_data;
1143 struct cfs_hash *hash;
1145 /* Because obd_gen_hash will not be released until
1146 * class_cleanup(), so hash should never be NULL here */
1147 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
1148 LASSERT(hash != NULL);
1149 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
1150 &exp->exp_gen_hash);
1151 cfs_hash_putref(hash);
1153 #endif /* HAVE_SERVER_SUPPORT */
1155 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
1156 list_del_init(&exp->exp_obd_chain_timed);
1157 exp->exp_obd->obd_num_exports--;
1158 spin_unlock(&exp->exp_obd->obd_dev_lock);
1160 /* A reference is kept by obd_stale_exports list */
1161 obd_stale_export_put(exp);
1163 EXPORT_SYMBOL(class_unlink_export);
1165 /* Import management functions */
1166 static void obd_zombie_import_free(struct obd_import *imp)
1168 struct obd_import_conn *imp_conn;
1171 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
1172 imp->imp_obd->obd_name);
1174 LASSERT(refcount_read(&imp->imp_refcount) == 0);
1176 ptlrpc_connection_put(imp->imp_connection);
1178 while ((imp_conn = list_first_entry_or_null(&imp->imp_conn_list,
1179 struct obd_import_conn,
1180 oic_item)) != NULL) {
1181 list_del_init(&imp_conn->oic_item);
1182 ptlrpc_connection_put(imp_conn->oic_conn);
1183 OBD_FREE(imp_conn, sizeof(*imp_conn));
1186 LASSERT(imp->imp_sec == NULL);
1187 LASSERTF(atomic_read(&imp->imp_reqs) == 0, "%s: imp_reqs = %d\n",
1188 imp->imp_obd->obd_name, atomic_read(&imp->imp_reqs));
1189 class_decref(imp->imp_obd, "import", imp);
1194 struct obd_import *class_import_get(struct obd_import *import)
1196 refcount_inc(&import->imp_refcount);
1197 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1198 refcount_read(&import->imp_refcount),
1199 import->imp_obd->obd_name);
1202 EXPORT_SYMBOL(class_import_get);
1204 void class_import_put(struct obd_import *imp)
1208 LASSERT(refcount_read(&imp->imp_refcount) > 0);
1210 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1211 refcount_read(&imp->imp_refcount) - 1,
1212 imp->imp_obd->obd_name);
1214 if (refcount_dec_and_test(&imp->imp_refcount)) {
1215 CDEBUG(D_INFO, "final put import %p\n", imp);
1216 obd_zombie_import_add(imp);
1221 EXPORT_SYMBOL(class_import_put);
1223 static void init_imp_at(struct imp_at *at) {
1225 at_init(&at->iat_net_latency, 0, 0);
1226 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1227 /* max service estimates are tracked on the server side, so
1228 don't use the AT history here, just use the last reported
1229 val. (But keep hist for proc histogram, worst_ever) */
1230 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1235 static void obd_zombie_imp_cull(struct work_struct *ws)
1237 struct obd_import *import;
1239 import = container_of(ws, struct obd_import, imp_zombie_work);
1240 obd_zombie_import_free(import);
1243 struct obd_import *class_new_import(struct obd_device *obd)
1245 struct obd_import *imp;
1246 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1248 OBD_ALLOC(imp, sizeof(*imp));
1252 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1253 INIT_LIST_HEAD(&imp->imp_replay_list);
1254 INIT_LIST_HEAD(&imp->imp_sending_list);
1255 INIT_LIST_HEAD(&imp->imp_delayed_list);
1256 INIT_LIST_HEAD(&imp->imp_committed_list);
1257 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1258 imp->imp_known_replied_xid = 0;
1259 imp->imp_replay_cursor = &imp->imp_committed_list;
1260 spin_lock_init(&imp->imp_lock);
1261 imp->imp_last_success_conn = 0;
1262 imp->imp_state = LUSTRE_IMP_NEW;
1263 imp->imp_obd = class_incref(obd, "import", imp);
1264 rwlock_init(&imp->imp_sec_lock);
1265 init_waitqueue_head(&imp->imp_recovery_waitq);
1266 INIT_WORK(&imp->imp_zombie_work, obd_zombie_imp_cull);
1268 if (curr_pid_ns && curr_pid_ns->child_reaper)
1269 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1271 imp->imp_sec_refpid = 1;
1273 refcount_set(&imp->imp_refcount, 2);
1274 atomic_set(&imp->imp_unregistering, 0);
1275 atomic_set(&imp->imp_reqs, 0);
1276 atomic_set(&imp->imp_inflight, 0);
1277 atomic_set(&imp->imp_replay_inflight, 0);
1278 init_waitqueue_head(&imp->imp_replay_waitq);
1279 atomic_set(&imp->imp_inval_count, 0);
1280 atomic_set(&imp->imp_waiting, 0);
1281 INIT_LIST_HEAD(&imp->imp_conn_list);
1282 init_imp_at(&imp->imp_at);
1284 /* the default magic is V2, will be used in connect RPC, and
1285 * then adjusted according to the flags in request/reply. */
1286 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1290 EXPORT_SYMBOL(class_new_import);
1292 void class_destroy_import(struct obd_import *import)
1294 LASSERT(import != NULL);
1295 LASSERT(import != LP_POISON);
1297 spin_lock(&import->imp_lock);
1298 import->imp_generation++;
1299 spin_unlock(&import->imp_lock);
1300 class_import_put(import);
1302 EXPORT_SYMBOL(class_destroy_import);
1304 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1306 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1308 spin_lock(&exp->exp_locks_list_guard);
1310 LASSERT(lock->l_exp_refs_nr >= 0);
1312 if (lock->l_exp_refs_target != NULL &&
1313 lock->l_exp_refs_target != exp) {
1314 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1315 exp, lock, lock->l_exp_refs_target);
1317 if ((lock->l_exp_refs_nr ++) == 0) {
1318 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1319 lock->l_exp_refs_target = exp;
1321 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1322 lock, exp, lock->l_exp_refs_nr);
1323 spin_unlock(&exp->exp_locks_list_guard);
1325 EXPORT_SYMBOL(__class_export_add_lock_ref);
1327 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1329 spin_lock(&exp->exp_locks_list_guard);
1330 LASSERT(lock->l_exp_refs_nr > 0);
1331 if (lock->l_exp_refs_target != exp) {
1332 LCONSOLE_WARN("lock %p, "
1333 "mismatching export pointers: %p, %p\n",
1334 lock, lock->l_exp_refs_target, exp);
1336 if (-- lock->l_exp_refs_nr == 0) {
1337 list_del_init(&lock->l_exp_refs_link);
1338 lock->l_exp_refs_target = NULL;
1340 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1341 lock, exp, lock->l_exp_refs_nr);
1342 spin_unlock(&exp->exp_locks_list_guard);
1344 EXPORT_SYMBOL(__class_export_del_lock_ref);
1347 /* A connection defines an export context in which preallocation can
1348 be managed. This releases the export pointer reference, and returns
1349 the export handle, so the export refcount is 1 when this function
1351 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1352 struct obd_uuid *cluuid)
1354 struct obd_export *export;
1355 LASSERT(conn != NULL);
1356 LASSERT(obd != NULL);
1357 LASSERT(cluuid != NULL);
1360 export = class_new_export(obd, cluuid);
1362 RETURN(PTR_ERR(export));
1364 conn->cookie = export->exp_handle.h_cookie;
1365 class_export_put(export);
1367 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1368 cluuid->uuid, conn->cookie);
1371 EXPORT_SYMBOL(class_connect);
1373 /* if export is involved in recovery then clean up related things */
1374 static void class_export_recovery_cleanup(struct obd_export *exp)
1376 struct obd_device *obd = exp->exp_obd;
1378 spin_lock(&obd->obd_recovery_task_lock);
1379 if (obd->obd_recovering) {
1380 if (exp->exp_in_recovery) {
1381 spin_lock(&exp->exp_lock);
1382 exp->exp_in_recovery = 0;
1383 spin_unlock(&exp->exp_lock);
1384 LASSERT(atomic_read(&(obd)->obd_connected_clients) > 0);
1385 atomic_dec(&obd->obd_connected_clients);
1388 /* if called during recovery then should update
1389 * obd_stale_clients counter,
1390 * lightweight exports are not counted */
1391 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1392 exp->exp_obd->obd_stale_clients++;
1394 spin_unlock(&obd->obd_recovery_task_lock);
1396 spin_lock(&exp->exp_lock);
1397 /** Cleanup req replay fields */
1398 if (exp->exp_req_replay_needed) {
1399 exp->exp_req_replay_needed = 0;
1401 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1402 atomic_dec(&obd->obd_req_replay_clients);
1405 /** Cleanup lock replay data */
1406 if (exp->exp_lock_replay_needed) {
1407 exp->exp_lock_replay_needed = 0;
1409 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1410 atomic_dec(&obd->obd_lock_replay_clients);
1412 spin_unlock(&exp->exp_lock);
1415 /* This function removes 1-3 references from the export:
1416 * 1 - for export pointer passed
1417 * and if disconnect really need
1418 * 2 - removing from hash
1419 * 3 - in client_unlink_export
1420 * The export pointer passed to this function can destroyed */
1421 int class_disconnect(struct obd_export *export)
1423 int already_disconnected;
1426 if (export == NULL) {
1427 CWARN("attempting to free NULL export %p\n", export);
1431 spin_lock(&export->exp_lock);
1432 already_disconnected = export->exp_disconnected;
1433 export->exp_disconnected = 1;
1434 #ifdef HAVE_SERVER_SUPPORT
1435 /* We hold references of export for uuid hash
1436 * and nid_hash and export link at least. So
1437 * it is safe to call rh*table_remove_fast in
1440 obd_nid_del(export->exp_obd, export);
1441 #endif /* HAVE_SERVER_SUPPORT */
1442 spin_unlock(&export->exp_lock);
1444 /* class_cleanup(), abort_recovery(), and class_fail_export()
1445 * all end up in here, and if any of them race we shouldn't
1446 * call extra class_export_puts(). */
1447 if (already_disconnected)
1448 GOTO(no_disconn, already_disconnected);
1450 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1451 export->exp_handle.h_cookie);
1453 class_export_recovery_cleanup(export);
1454 class_unlink_export(export);
1456 class_export_put(export);
1459 EXPORT_SYMBOL(class_disconnect);
1461 /* Return non-zero for a fully connected export */
1462 int class_connected_export(struct obd_export *exp)
1467 spin_lock(&exp->exp_lock);
1468 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1469 spin_unlock(&exp->exp_lock);
1473 EXPORT_SYMBOL(class_connected_export);
1475 static void class_disconnect_export_list(struct list_head *list,
1476 enum obd_option flags)
1479 struct obd_export *exp;
1482 /* It's possible that an export may disconnect itself, but
1483 * nothing else will be added to this list.
1485 while ((exp = list_first_entry_or_null(list, struct obd_export,
1486 exp_obd_chain)) != NULL) {
1487 /* need for safe call CDEBUG after obd_disconnect */
1488 class_export_get(exp);
1490 spin_lock(&exp->exp_lock);
1491 exp->exp_flags = flags;
1492 spin_unlock(&exp->exp_lock);
1494 if (obd_uuid_equals(&exp->exp_client_uuid,
1495 &exp->exp_obd->obd_uuid)) {
1497 "exp %p export uuid == obd uuid, don't discon\n",
1499 /* Need to delete this now so we don't end up pointing
1500 * to work_list later when this export is cleaned up. */
1501 list_del_init(&exp->exp_obd_chain);
1502 class_export_put(exp);
1506 class_export_get(exp);
1507 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1508 "last request at %lld\n",
1509 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1510 exp, exp->exp_last_request_time);
1511 /* release one export reference anyway */
1512 rc = obd_disconnect(exp);
1514 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1515 obd_export_nid2str(exp), exp, rc);
1516 class_export_put(exp);
1521 void class_disconnect_exports(struct obd_device *obd)
1523 LIST_HEAD(work_list);
1526 /* Move all of the exports from obd_exports to a work list, en masse. */
1527 spin_lock(&obd->obd_dev_lock);
1528 list_splice_init(&obd->obd_exports, &work_list);
1529 list_splice_init(&obd->obd_delayed_exports, &work_list);
1530 spin_unlock(&obd->obd_dev_lock);
1532 if (!list_empty(&work_list)) {
1533 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1534 "disconnecting them\n", obd->obd_minor, obd);
1535 class_disconnect_export_list(&work_list,
1536 exp_flags_from_obd(obd));
1538 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1539 obd->obd_minor, obd);
1542 EXPORT_SYMBOL(class_disconnect_exports);
1544 /* Remove exports that have not completed recovery.
1546 void class_disconnect_stale_exports(struct obd_device *obd,
1547 int (*test_export)(struct obd_export *))
1549 LIST_HEAD(work_list);
1550 struct obd_export *exp, *n;
1554 spin_lock(&obd->obd_dev_lock);
1555 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1557 /* don't count self-export as client */
1558 if (obd_uuid_equals(&exp->exp_client_uuid,
1559 &exp->exp_obd->obd_uuid))
1562 /* don't evict clients which have no slot in last_rcvd
1563 * (e.g. lightweight connection) */
1564 if (exp->exp_target_data.ted_lr_idx == -1)
1567 spin_lock(&exp->exp_lock);
1568 if (exp->exp_failed || test_export(exp)) {
1569 spin_unlock(&exp->exp_lock);
1572 exp->exp_failed = 1;
1573 atomic_inc(&exp->exp_obd->obd_eviction_count);
1574 spin_unlock(&exp->exp_lock);
1576 list_move(&exp->exp_obd_chain, &work_list);
1578 CWARN("%s: disconnect stale client %s@%s\n",
1579 obd->obd_name, exp->exp_client_uuid.uuid,
1580 obd_export_nid2str(exp));
1581 print_export_data(exp, "EVICTING", 0, D_HA);
1583 spin_unlock(&obd->obd_dev_lock);
1586 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1587 obd->obd_name, evicted);
1589 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1590 OBD_OPT_ABORT_RECOV);
1593 EXPORT_SYMBOL(class_disconnect_stale_exports);
1595 void class_fail_export(struct obd_export *exp)
1597 int rc, already_failed;
1599 spin_lock(&exp->exp_lock);
1600 already_failed = exp->exp_failed;
1601 exp->exp_failed = 1;
1602 spin_unlock(&exp->exp_lock);
1604 if (already_failed) {
1605 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1606 exp, exp->exp_client_uuid.uuid);
1610 atomic_inc(&exp->exp_obd->obd_eviction_count);
1612 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1613 exp, exp->exp_client_uuid.uuid);
1615 if (obd_dump_on_timeout)
1616 libcfs_debug_dumplog();
1618 /* need for safe call CDEBUG after obd_disconnect */
1619 class_export_get(exp);
1621 /* Most callers into obd_disconnect are removing their own reference
1622 * (request, for example) in addition to the one from the hash table.
1623 * We don't have such a reference here, so make one. */
1624 class_export_get(exp);
1625 rc = obd_disconnect(exp);
1627 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1629 CDEBUG(D_HA, "disconnected export %p/%s\n",
1630 exp, exp->exp_client_uuid.uuid);
1631 class_export_put(exp);
1633 EXPORT_SYMBOL(class_fail_export);
1635 #ifdef HAVE_SERVER_SUPPORT
1637 static int take_first(struct obd_export *exp, void *data)
1639 struct obd_export **expp = data;
1642 /* already have one */
1644 if (exp->exp_failed)
1645 /* Don't want this one */
1647 if (!refcount_inc_not_zero(&exp->exp_handle.h_ref))
1648 /* Cannot get a ref on this one */
1654 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1656 struct lnet_nid nid_key;
1657 struct obd_export *doomed_exp;
1658 int exports_evicted = 0;
1660 libcfs_strnid(&nid_key, nid);
1662 spin_lock(&obd->obd_dev_lock);
1663 /* umount has run already, so evict thread should leave
1664 * its task to umount thread now */
1665 if (obd->obd_stopping) {
1666 spin_unlock(&obd->obd_dev_lock);
1667 return exports_evicted;
1669 spin_unlock(&obd->obd_dev_lock);
1672 while (obd_nid_export_for_each(obd, &nid_key,
1673 take_first, &doomed_exp) > 0) {
1675 LASSERTF(doomed_exp != obd->obd_self_export,
1676 "self-export is hashed by NID?\n");
1678 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative request\n",
1680 obd_uuid2str(&doomed_exp->exp_client_uuid),
1681 obd_export_nid2str(doomed_exp));
1683 class_fail_export(doomed_exp);
1684 class_export_put(doomed_exp);
1689 if (!exports_evicted)
1691 "%s: can't disconnect NID '%s': no exports found\n",
1692 obd->obd_name, nid);
1693 return exports_evicted;
1695 EXPORT_SYMBOL(obd_export_evict_by_nid);
1697 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1699 struct obd_export *doomed_exp = NULL;
1700 struct obd_uuid doomed_uuid;
1701 int exports_evicted = 0;
1703 spin_lock(&obd->obd_dev_lock);
1704 if (obd->obd_stopping) {
1705 spin_unlock(&obd->obd_dev_lock);
1706 return exports_evicted;
1708 spin_unlock(&obd->obd_dev_lock);
1710 obd_str2uuid(&doomed_uuid, uuid);
1711 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1712 CERROR("%s: can't evict myself\n", obd->obd_name);
1713 return exports_evicted;
1716 doomed_exp = obd_uuid_lookup(obd, &doomed_uuid);
1717 if (doomed_exp == NULL) {
1718 CERROR("%s: can't disconnect %s: no exports found\n",
1719 obd->obd_name, uuid);
1721 CWARN("%s: evicting %s at adminstrative request\n",
1722 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1723 class_fail_export(doomed_exp);
1724 class_export_put(doomed_exp);
1725 obd_uuid_del(obd, doomed_exp);
1729 return exports_evicted;
1731 #endif /* HAVE_SERVER_SUPPORT */
1733 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1734 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1735 EXPORT_SYMBOL(class_export_dump_hook);
1738 static void print_export_data(struct obd_export *exp, const char *status,
1739 int locks, int debug_level)
1741 struct ptlrpc_reply_state *rs;
1742 struct ptlrpc_reply_state *first_reply = NULL;
1745 spin_lock(&exp->exp_lock);
1746 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1752 spin_unlock(&exp->exp_lock);
1754 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1755 "%p %s %llu stale:%d\n",
1756 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1757 obd_export_nid2str(exp),
1758 refcount_read(&exp->exp_handle.h_ref),
1759 atomic_read(&exp->exp_rpc_count),
1760 atomic_read(&exp->exp_cb_count),
1761 atomic_read(&exp->exp_locks_count),
1762 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1763 nreplies, first_reply, nreplies > 3 ? "..." : "",
1764 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1765 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1766 if (locks && class_export_dump_hook != NULL)
1767 class_export_dump_hook(exp);
1771 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1773 struct obd_export *exp;
1775 spin_lock(&obd->obd_dev_lock);
1776 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1777 print_export_data(exp, "ACTIVE", locks, debug_level);
1778 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1779 print_export_data(exp, "UNLINKED", locks, debug_level);
1780 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1781 print_export_data(exp, "DELAYED", locks, debug_level);
1782 spin_unlock(&obd->obd_dev_lock);
1785 void obd_exports_barrier(struct obd_device *obd)
1788 LASSERT(list_empty(&obd->obd_exports));
1789 spin_lock(&obd->obd_dev_lock);
1790 while (!list_empty(&obd->obd_unlinked_exports)) {
1791 spin_unlock(&obd->obd_dev_lock);
1792 schedule_timeout_uninterruptible(cfs_time_seconds(waited));
1793 if (waited > 5 && is_power_of_2(waited)) {
1794 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1795 "more than %d seconds. "
1796 "The obd refcount = %d. Is it stuck?\n",
1797 obd->obd_name, waited,
1798 atomic_read(&obd->obd_refcount));
1799 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1802 spin_lock(&obd->obd_dev_lock);
1804 spin_unlock(&obd->obd_dev_lock);
1806 EXPORT_SYMBOL(obd_exports_barrier);
1809 * Add export to the obd_zombe thread and notify it.
1811 static void obd_zombie_export_add(struct obd_export *exp) {
1812 atomic_inc(&obd_stale_export_num);
1813 spin_lock(&exp->exp_obd->obd_dev_lock);
1814 LASSERT(!list_empty(&exp->exp_obd_chain));
1815 list_del_init(&exp->exp_obd_chain);
1816 spin_unlock(&exp->exp_obd->obd_dev_lock);
1817 queue_work(zombie_wq, &exp->exp_zombie_work);
1821 * Add import to the obd_zombe thread and notify it.
1823 static void obd_zombie_import_add(struct obd_import *imp) {
1824 LASSERT(imp->imp_sec == NULL);
1826 queue_work(zombie_wq, &imp->imp_zombie_work);
1830 * wait when obd_zombie import/export queues become empty
1832 void obd_zombie_barrier(void)
1834 wait_var_event(&obd_stale_export_num,
1835 atomic_read(&obd_stale_export_num) == 0);
1836 flush_workqueue(zombie_wq);
1838 EXPORT_SYMBOL(obd_zombie_barrier);
1841 struct obd_export *obd_stale_export_get(void)
1843 struct obd_export *exp = NULL;
1846 spin_lock(&obd_stale_export_lock);
1847 if (!list_empty(&obd_stale_exports)) {
1848 exp = list_first_entry(&obd_stale_exports,
1849 struct obd_export, exp_stale_list);
1850 list_del_init(&exp->exp_stale_list);
1852 spin_unlock(&obd_stale_export_lock);
1855 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1856 atomic_read(&obd_stale_export_num));
1860 EXPORT_SYMBOL(obd_stale_export_get);
1862 void obd_stale_export_put(struct obd_export *exp)
1866 LASSERT(list_empty(&exp->exp_stale_list));
1867 if (exp->exp_lock_hash &&
1868 atomic_read(&exp->exp_lock_hash->hs_count)) {
1869 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1870 atomic_read(&obd_stale_export_num));
1872 spin_lock_bh(&exp->exp_bl_list_lock);
1873 spin_lock(&obd_stale_export_lock);
1874 /* Add to the tail if there is no blocked locks,
1875 * to the head otherwise. */
1876 if (list_empty(&exp->exp_bl_list))
1877 list_add_tail(&exp->exp_stale_list,
1878 &obd_stale_exports);
1880 list_add(&exp->exp_stale_list,
1881 &obd_stale_exports);
1883 spin_unlock(&obd_stale_export_lock);
1884 spin_unlock_bh(&exp->exp_bl_list_lock);
1886 class_export_put(exp);
1890 EXPORT_SYMBOL(obd_stale_export_put);
1893 * Adjust the position of the export in the stale list,
1894 * i.e. move to the head of the list if is needed.
1896 void obd_stale_export_adjust(struct obd_export *exp)
1898 LASSERT(exp != NULL);
1899 spin_lock_bh(&exp->exp_bl_list_lock);
1900 spin_lock(&obd_stale_export_lock);
1902 if (!list_empty(&exp->exp_stale_list) &&
1903 !list_empty(&exp->exp_bl_list))
1904 list_move(&exp->exp_stale_list, &obd_stale_exports);
1906 spin_unlock(&obd_stale_export_lock);
1907 spin_unlock_bh(&exp->exp_bl_list_lock);
1909 EXPORT_SYMBOL(obd_stale_export_adjust);
1912 * start destroy zombie import/export thread
1914 int obd_zombie_impexp_init(void)
1916 zombie_wq = cfs_cpt_bind_workqueue("obd_zombid", cfs_cpt_tab,
1918 cfs_cpt_number(cfs_cpt_tab));
1920 return IS_ERR(zombie_wq) ? PTR_ERR(zombie_wq) : 0;
1924 * stop destroy zombie import/export thread
1926 void obd_zombie_impexp_stop(void)
1928 destroy_workqueue(zombie_wq);
1929 LASSERT(list_empty(&obd_stale_exports));
1932 /***** Kernel-userspace comm helpers *******/
1934 /* Get length of entire message, including header */
1935 int kuc_len(int payload_len)
1937 return sizeof(struct kuc_hdr) + payload_len;
1939 EXPORT_SYMBOL(kuc_len);
1941 /* Get a pointer to kuc header, given a ptr to the payload
1942 * @param p Pointer to payload area
1943 * @returns Pointer to kuc header
1945 struct kuc_hdr * kuc_ptr(void *p)
1947 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1948 LASSERT(lh->kuc_magic == KUC_MAGIC);
1951 EXPORT_SYMBOL(kuc_ptr);
1953 /* Alloc space for a message, and fill in header
1954 * @return Pointer to payload area
1956 void *kuc_alloc(int payload_len, int transport, int type)
1959 int len = kuc_len(payload_len);
1963 return ERR_PTR(-ENOMEM);
1965 lh->kuc_magic = KUC_MAGIC;
1966 lh->kuc_transport = transport;
1967 lh->kuc_msgtype = type;
1968 lh->kuc_msglen = len;
1970 return (void *)(lh + 1);
1972 EXPORT_SYMBOL(kuc_alloc);
1974 /* Takes pointer to payload area */
1975 void kuc_free(void *p, int payload_len)
1977 struct kuc_hdr *lh = kuc_ptr(p);
1978 OBD_FREE(lh, kuc_len(payload_len));
1980 EXPORT_SYMBOL(kuc_free);
1982 struct obd_request_slot_waiter {
1983 struct list_head orsw_entry;
1984 wait_queue_head_t orsw_waitq;
1988 static bool obd_request_slot_avail(struct client_obd *cli,
1989 struct obd_request_slot_waiter *orsw)
1993 spin_lock(&cli->cl_loi_list_lock);
1994 avail = !!list_empty(&orsw->orsw_entry);
1995 spin_unlock(&cli->cl_loi_list_lock);
2001 * For network flow control, the RPC sponsor needs to acquire a credit
2002 * before sending the RPC. The credits count for a connection is defined
2003 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
2004 * the subsequent RPC sponsors need to wait until others released their
2005 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
2007 int obd_get_request_slot(struct client_obd *cli)
2009 struct obd_request_slot_waiter orsw;
2012 spin_lock(&cli->cl_loi_list_lock);
2013 if (cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight) {
2014 cli->cl_rpcs_in_flight++;
2015 spin_unlock(&cli->cl_loi_list_lock);
2019 init_waitqueue_head(&orsw.orsw_waitq);
2020 list_add_tail(&orsw.orsw_entry, &cli->cl_flight_waiters);
2021 orsw.orsw_signaled = false;
2022 spin_unlock(&cli->cl_loi_list_lock);
2024 rc = l_wait_event_abortable(orsw.orsw_waitq,
2025 obd_request_slot_avail(cli, &orsw) ||
2026 orsw.orsw_signaled);
2028 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2029 * freed but other (such as obd_put_request_slot) is using it. */
2030 spin_lock(&cli->cl_loi_list_lock);
2032 if (!orsw.orsw_signaled) {
2033 if (list_empty(&orsw.orsw_entry))
2034 cli->cl_rpcs_in_flight--;
2036 list_del(&orsw.orsw_entry);
2041 if (orsw.orsw_signaled) {
2042 LASSERT(list_empty(&orsw.orsw_entry));
2046 spin_unlock(&cli->cl_loi_list_lock);
2050 EXPORT_SYMBOL(obd_get_request_slot);
2052 void obd_put_request_slot(struct client_obd *cli)
2054 struct obd_request_slot_waiter *orsw;
2056 spin_lock(&cli->cl_loi_list_lock);
2057 cli->cl_rpcs_in_flight--;
2059 /* If there is free slot, wakeup the first waiter. */
2060 if (!list_empty(&cli->cl_flight_waiters) &&
2061 likely(cli->cl_rpcs_in_flight < cli->cl_max_rpcs_in_flight)) {
2062 orsw = list_first_entry(&cli->cl_flight_waiters,
2063 struct obd_request_slot_waiter,
2065 list_del_init(&orsw->orsw_entry);
2066 cli->cl_rpcs_in_flight++;
2067 wake_up(&orsw->orsw_waitq);
2069 spin_unlock(&cli->cl_loi_list_lock);
2071 EXPORT_SYMBOL(obd_put_request_slot);
2073 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2075 return cli->cl_max_rpcs_in_flight;
2077 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2079 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2081 struct obd_request_slot_waiter *orsw;
2087 if (max > OBD_MAX_RIF_MAX || max < 1)
2090 CDEBUG(D_INFO, "%s: max = %u max_mod = %u rif = %u\n",
2091 cli->cl_import->imp_obd->obd_name, max,
2092 cli->cl_max_mod_rpcs_in_flight, cli->cl_max_rpcs_in_flight);
2094 if (strcmp(cli->cl_import->imp_obd->obd_type->typ_name,
2095 LUSTRE_MDC_NAME) == 0) {
2096 /* adjust max_mod_rpcs_in_flight to ensure it is always
2097 * strictly lower that max_rpcs_in_flight */
2099 CERROR("%s: cannot set mdc.*.max_rpcs_in_flight=1\n",
2100 cli->cl_import->imp_obd->obd_name);
2103 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2104 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2110 spin_lock(&cli->cl_loi_list_lock);
2111 old = cli->cl_max_rpcs_in_flight;
2112 cli->cl_max_rpcs_in_flight = max;
2113 client_adjust_max_dirty(cli);
2117 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2118 for (i = 0; i < diff; i++) {
2119 orsw = list_first_entry_or_null(&cli->cl_loi_read_list,
2120 struct obd_request_slot_waiter,
2125 list_del_init(&orsw->orsw_entry);
2126 cli->cl_rpcs_in_flight++;
2127 wake_up(&orsw->orsw_waitq);
2129 spin_unlock(&cli->cl_loi_list_lock);
2133 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2135 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2137 return cli->cl_max_mod_rpcs_in_flight;
2139 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2141 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2143 struct obd_connect_data *ocd;
2147 if (max > OBD_MAX_RIF_MAX || max < 1)
2150 ocd = &cli->cl_import->imp_connect_data;
2151 CDEBUG(D_INFO, "%s: max = %hu flags = %llx, max_mod = %u rif = %u\n",
2152 cli->cl_import->imp_obd->obd_name, max, ocd->ocd_connect_flags,
2153 ocd->ocd_maxmodrpcs, cli->cl_max_rpcs_in_flight);
2155 if (max == OBD_MAX_RIF_MAX)
2156 max = OBD_MAX_RIF_MAX - 1;
2158 /* Cannot exceed or equal max_rpcs_in_flight. If we are asked to
2159 * increase this value, also bump up max_rpcs_in_flight to match.
2161 if (max >= cli->cl_max_rpcs_in_flight) {
2163 "%s: increasing max_rpcs_in_flight=%u to allow larger max_mod_rpcs_in_flight=%u\n",
2164 cli->cl_import->imp_obd->obd_name, max + 1, max);
2165 obd_set_max_rpcs_in_flight(cli, max + 1);
2168 /* cannot exceed max modify RPCs in flight supported by the server,
2169 * but verify ocd_connect_flags is at least initialized first. If
2170 * not, allow it and fix value later in ptlrpc_connect_set_flags().
2172 if (!ocd->ocd_connect_flags) {
2173 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2174 } else if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS) {
2175 maxmodrpcs = ocd->ocd_maxmodrpcs;
2176 if (maxmodrpcs == 0) { /* connection not finished yet */
2177 maxmodrpcs = cli->cl_max_rpcs_in_flight - 1;
2179 "%s: partial connect, assume maxmodrpcs=%hu\n",
2180 cli->cl_import->imp_obd->obd_name, maxmodrpcs);
2185 if (max > maxmodrpcs) {
2186 CERROR("%s: can't set max_mod_rpcs_in_flight=%hu higher than mdt.*.max_mod_rpcs_in_flight=%hu returned by the MDT server at connection.\n",
2187 cli->cl_import->imp_obd->obd_name,
2192 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2194 prev = cli->cl_max_mod_rpcs_in_flight;
2195 cli->cl_max_mod_rpcs_in_flight = max;
2197 /* wakeup waiters if limit has been increased */
2198 if (cli->cl_max_mod_rpcs_in_flight > prev)
2199 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2201 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2205 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2207 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2208 struct seq_file *seq)
2210 unsigned long mod_tot = 0, mod_cum;
2213 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2214 lprocfs_stats_header(seq, ktime_get_real(), cli->cl_mod_rpcs_init, 25,
2216 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2217 cli->cl_mod_rpcs_in_flight);
2219 seq_printf(seq, "\n\t\t\tmodify\n");
2220 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2222 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2225 for (i = 0; i < OBD_HIST_MAX; i++) {
2226 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2229 seq_printf(seq, "%d:\t\t%10lu %3u %3u\n",
2230 i, mod, pct(mod, mod_tot),
2231 pct(mod_cum, mod_tot));
2232 if (mod_cum == mod_tot)
2236 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2240 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2242 /* The number of modify RPCs sent in parallel is limited
2243 * because the server has a finite number of slots per client to
2244 * store request result and ensure reply reconstruction when needed.
2245 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2246 * that takes into account server limit and cl_max_rpcs_in_flight
2248 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2249 * one close request is allowed above the maximum.
2252 struct client_obd *cli;
2255 wait_queue_entry_t wqe;
2257 static int claim_mod_rpc_function(wait_queue_entry_t *wq_entry,
2258 unsigned int mode, int flags, void *key)
2260 struct mod_waiter *w = container_of(wq_entry, struct mod_waiter, wqe);
2261 struct client_obd *cli = w->cli;
2262 bool close_req = w->close_req;
2266 /* As woken_wake_function() doesn't remove us from the wait_queue,
2267 * we use own flag to ensure we're called just once.
2272 /* A slot is available if
2273 * - number of modify RPCs in flight is less than the max
2274 * - it's a close RPC and no other close request is in flight
2276 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2277 (close_req && cli->cl_close_rpcs_in_flight == 0);
2279 cli->cl_mod_rpcs_in_flight++;
2281 cli->cl_close_rpcs_in_flight++;
2282 ret = woken_wake_function(wq_entry, mode, flags, key);
2284 } else if (cli->cl_close_rpcs_in_flight)
2285 /* No other waiter could be woken */
2287 else if (key == NULL)
2288 /* This was not a wakeup from a close completion, so there is no
2289 * point seeing if there are close waiters to be woken
2293 /* There might be be a close we could wake, keep looking */
2298 /* Get a modify RPC slot from the obd client @cli according
2299 * to the kind of operation @opc that is going to be sent
2300 * and the intent @it of the operation if it applies.
2301 * If the maximum number of modify RPCs in flight is reached
2302 * the thread is put to sleep.
2303 * Returns the tag to be set in the request message. Tag 0
2304 * is reserved for non-modifying requests.
2306 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc)
2308 struct mod_waiter wait = {
2310 .close_req = (opc == MDS_CLOSE),
2315 init_wait(&wait.wqe);
2316 wait.wqe.func = claim_mod_rpc_function;
2318 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2319 __add_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2320 /* This wakeup will only succeed if the maximums haven't
2321 * been reached. If that happens, WQ_FLAG_WOKEN will be cleared
2322 * and there will be no need to wait.
2324 wake_up_locked(&cli->cl_mod_rpcs_waitq);
2325 /* XXX: handle spurious wakeups (from unknown yet source */
2326 while (wait.woken == false) {
2327 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2328 wait_woken(&wait.wqe, TASK_UNINTERRUPTIBLE,
2329 MAX_SCHEDULE_TIMEOUT);
2330 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2332 __remove_wait_queue(&cli->cl_mod_rpcs_waitq, &wait.wqe);
2334 max = cli->cl_max_mod_rpcs_in_flight;
2335 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2336 cli->cl_mod_rpcs_in_flight);
2337 /* find a free tag */
2338 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2340 LASSERT(i < OBD_MAX_RIF_MAX);
2341 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2342 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2343 /* tag 0 is reserved for non-modify RPCs */
2346 "%s: modify RPC slot %u is allocated opc %u, max %hu\n",
2347 cli->cl_import->imp_obd->obd_name,
2352 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2354 /* Put a modify RPC slot from the obd client @cli according
2355 * to the kind of operation @opc that has been sent.
2357 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc, __u16 tag)
2359 bool close_req = false;
2364 if (opc == MDS_CLOSE)
2367 spin_lock_irq(&cli->cl_mod_rpcs_waitq.lock);
2368 cli->cl_mod_rpcs_in_flight--;
2370 cli->cl_close_rpcs_in_flight--;
2371 /* release the tag in the bitmap */
2372 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2373 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2374 __wake_up_locked_key(&cli->cl_mod_rpcs_waitq, TASK_NORMAL,
2376 spin_unlock_irq(&cli->cl_mod_rpcs_waitq.lock);
2378 EXPORT_SYMBOL(obd_put_mod_rpc_slot);