4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lprocfs_status.h>
44 #include <lustre_disk.h>
45 #include <lustre_kernelcomm.h>
47 static DEFINE_SPINLOCK(obd_types_lock);
48 static LIST_HEAD(obd_types);
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52 static struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 static struct kmem_cache *import_cachep;
57 static LIST_HEAD(obd_zombie_imports);
58 static LIST_HEAD(obd_zombie_exports);
59 static DEFINE_SPINLOCK(obd_zombie_impexp_lock);
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks, int debug_level);
67 static LIST_HEAD(obd_stale_exports);
68 static DEFINE_SPINLOCK(obd_stale_export_lock);
69 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
71 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
72 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
75 * support functions: we could use inter-module communication, but this
76 * is more portable to other OS's
78 static struct obd_device *obd_device_alloc(void)
80 struct obd_device *obd;
82 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
84 obd->obd_magic = OBD_DEVICE_MAGIC;
89 static void obd_device_free(struct obd_device *obd)
92 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
93 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
94 if (obd->obd_namespace != NULL) {
95 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
96 obd, obd->obd_namespace, obd->obd_force);
99 lu_ref_fini(&obd->obd_reference);
100 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
103 struct obd_type *class_search_type(const char *name)
105 struct list_head *tmp;
106 struct obd_type *type;
108 spin_lock(&obd_types_lock);
109 list_for_each(tmp, &obd_types) {
110 type = list_entry(tmp, struct obd_type, typ_chain);
111 if (strcmp(type->typ_name, name) == 0) {
112 spin_unlock(&obd_types_lock);
116 spin_unlock(&obd_types_lock);
119 EXPORT_SYMBOL(class_search_type);
121 struct obd_type *class_get_type(const char *name)
123 struct obd_type *type = class_search_type(name);
125 #ifdef HAVE_MODULE_LOADING_SUPPORT
127 const char *modname = name;
129 if (strcmp(modname, "obdfilter") == 0)
132 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
133 modname = LUSTRE_OSP_NAME;
135 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
136 modname = LUSTRE_MDT_NAME;
138 if (!request_module("%s", modname)) {
139 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
140 type = class_search_type(name);
142 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
148 spin_lock(&type->obd_type_lock);
150 try_module_get(type->typ_dt_ops->o_owner);
151 spin_unlock(&type->obd_type_lock);
156 void class_put_type(struct obd_type *type)
159 spin_lock(&type->obd_type_lock);
161 module_put(type->typ_dt_ops->o_owner);
162 spin_unlock(&type->obd_type_lock);
165 #define CLASS_MAX_NAME 1024
167 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
168 bool enable_proc, struct lprocfs_vars *vars,
169 const char *name, struct lu_device_type *ldt)
171 struct obd_type *type;
176 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
178 if (class_search_type(name)) {
179 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
184 OBD_ALLOC(type, sizeof(*type));
188 OBD_ALLOC_PTR(type->typ_dt_ops);
189 OBD_ALLOC_PTR(type->typ_md_ops);
190 OBD_ALLOC(type->typ_name, strlen(name) + 1);
192 if (type->typ_dt_ops == NULL ||
193 type->typ_md_ops == NULL ||
194 type->typ_name == NULL)
197 *(type->typ_dt_ops) = *dt_ops;
198 /* md_ops is optional */
200 *(type->typ_md_ops) = *md_ops;
201 strcpy(type->typ_name, name);
202 spin_lock_init(&type->obd_type_lock);
204 #ifdef CONFIG_PROC_FS
206 type->typ_procroot = lprocfs_register(type->typ_name,
209 if (IS_ERR(type->typ_procroot)) {
210 rc = PTR_ERR(type->typ_procroot);
211 type->typ_procroot = NULL;
216 type->typ_kobj = kobject_create_and_add(type->typ_name, lustre_kobj);
217 if (!type->typ_kobj) {
224 rc = lu_device_type_init(ldt);
229 spin_lock(&obd_types_lock);
230 list_add(&type->typ_chain, &obd_types);
231 spin_unlock(&obd_types_lock);
237 kobject_put(type->typ_kobj);
238 if (type->typ_name != NULL) {
239 #ifdef CONFIG_PROC_FS
240 if (type->typ_procroot != NULL)
241 remove_proc_subtree(type->typ_name, proc_lustre_root);
243 OBD_FREE(type->typ_name, strlen(name) + 1);
245 if (type->typ_md_ops != NULL)
246 OBD_FREE_PTR(type->typ_md_ops);
247 if (type->typ_dt_ops != NULL)
248 OBD_FREE_PTR(type->typ_dt_ops);
249 OBD_FREE(type, sizeof(*type));
252 EXPORT_SYMBOL(class_register_type);
254 int class_unregister_type(const char *name)
256 struct obd_type *type = class_search_type(name);
260 CERROR("unknown obd type\n");
264 if (type->typ_refcnt) {
265 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
266 /* This is a bad situation, let's make the best of it */
267 /* Remove ops, but leave the name for debugging */
268 OBD_FREE_PTR(type->typ_dt_ops);
269 OBD_FREE_PTR(type->typ_md_ops);
274 kobject_put(type->typ_kobj);
276 /* we do not use type->typ_procroot as for compatibility purposes
277 * other modules can share names (i.e. lod can use lov entry). so
278 * we can't reference pointer as it can get invalided when another
279 * module removes the entry */
280 #ifdef CONFIG_PROC_FS
281 if (type->typ_procroot != NULL)
282 remove_proc_subtree(type->typ_name, proc_lustre_root);
283 if (type->typ_procsym != NULL)
284 lprocfs_remove(&type->typ_procsym);
287 lu_device_type_fini(type->typ_lu);
289 spin_lock(&obd_types_lock);
290 list_del(&type->typ_chain);
291 spin_unlock(&obd_types_lock);
292 OBD_FREE(type->typ_name, strlen(name) + 1);
293 if (type->typ_dt_ops != NULL)
294 OBD_FREE_PTR(type->typ_dt_ops);
295 if (type->typ_md_ops != NULL)
296 OBD_FREE_PTR(type->typ_md_ops);
297 OBD_FREE(type, sizeof(*type));
299 } /* class_unregister_type */
300 EXPORT_SYMBOL(class_unregister_type);
303 * Create a new obd device.
305 * Find an empty slot in ::obd_devs[], create a new obd device in it.
307 * \param[in] type_name obd device type string.
308 * \param[in] name obd device name.
310 * \retval NULL if create fails, otherwise return the obd device
313 struct obd_device *class_newdev(const char *type_name, const char *name)
315 struct obd_device *result = NULL;
316 struct obd_device *newdev;
317 struct obd_type *type = NULL;
319 int new_obd_minor = 0;
322 if (strlen(name) >= MAX_OBD_NAME) {
323 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
324 RETURN(ERR_PTR(-EINVAL));
327 type = class_get_type(type_name);
329 CERROR("OBD: unknown type: %s\n", type_name);
330 RETURN(ERR_PTR(-ENODEV));
333 newdev = obd_device_alloc();
335 GOTO(out_type, result = ERR_PTR(-ENOMEM));
337 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
339 write_lock(&obd_dev_lock);
340 for (i = 0; i < class_devno_max(); i++) {
341 struct obd_device *obd = class_num2obd(i);
343 if (obd && (strcmp(name, obd->obd_name) == 0)) {
344 CERROR("Device %s already exists at %d, won't add\n",
347 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
348 "%p obd_magic %08x != %08x\n", result,
349 result->obd_magic, OBD_DEVICE_MAGIC);
350 LASSERTF(result->obd_minor == new_obd_minor,
351 "%p obd_minor %d != %d\n", result,
352 result->obd_minor, new_obd_minor);
354 obd_devs[result->obd_minor] = NULL;
355 result->obd_name[0]='\0';
357 result = ERR_PTR(-EEXIST);
360 if (!result && !obd) {
362 result->obd_minor = i;
364 result->obd_type = type;
365 strncpy(result->obd_name, name,
366 sizeof(result->obd_name) - 1);
367 obd_devs[i] = result;
370 write_unlock(&obd_dev_lock);
372 if (result == NULL && i >= class_devno_max()) {
373 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
375 GOTO(out, result = ERR_PTR(-EOVERFLOW));
381 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
382 result->obd_name, result);
386 obd_device_free(newdev);
388 class_put_type(type);
392 void class_release_dev(struct obd_device *obd)
394 struct obd_type *obd_type = obd->obd_type;
396 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
397 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
398 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
399 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
400 LASSERT(obd_type != NULL);
402 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
403 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
405 write_lock(&obd_dev_lock);
406 obd_devs[obd->obd_minor] = NULL;
407 write_unlock(&obd_dev_lock);
408 obd_device_free(obd);
410 class_put_type(obd_type);
413 int class_name2dev(const char *name)
420 read_lock(&obd_dev_lock);
421 for (i = 0; i < class_devno_max(); i++) {
422 struct obd_device *obd = class_num2obd(i);
424 if (obd && strcmp(name, obd->obd_name) == 0) {
425 /* Make sure we finished attaching before we give
426 out any references */
427 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
428 if (obd->obd_attached) {
429 read_unlock(&obd_dev_lock);
435 read_unlock(&obd_dev_lock);
440 struct obd_device *class_name2obd(const char *name)
442 int dev = class_name2dev(name);
444 if (dev < 0 || dev > class_devno_max())
446 return class_num2obd(dev);
448 EXPORT_SYMBOL(class_name2obd);
450 int class_uuid2dev(struct obd_uuid *uuid)
454 read_lock(&obd_dev_lock);
455 for (i = 0; i < class_devno_max(); i++) {
456 struct obd_device *obd = class_num2obd(i);
458 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
459 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
460 read_unlock(&obd_dev_lock);
464 read_unlock(&obd_dev_lock);
469 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
471 int dev = class_uuid2dev(uuid);
474 return class_num2obd(dev);
476 EXPORT_SYMBOL(class_uuid2obd);
479 * Get obd device from ::obd_devs[]
481 * \param num [in] array index
483 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
484 * otherwise return the obd device there.
486 struct obd_device *class_num2obd(int num)
488 struct obd_device *obd = NULL;
490 if (num < class_devno_max()) {
495 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
496 "%p obd_magic %08x != %08x\n",
497 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
498 LASSERTF(obd->obd_minor == num,
499 "%p obd_minor %0d != %0d\n",
500 obd, obd->obd_minor, num);
507 * Get obd devices count. Device in any
509 * \retval obd device count
511 int get_devices_count(void)
513 int index, max_index = class_devno_max(), dev_count = 0;
515 read_lock(&obd_dev_lock);
516 for (index = 0; index <= max_index; index++) {
517 struct obd_device *obd = class_num2obd(index);
521 read_unlock(&obd_dev_lock);
525 EXPORT_SYMBOL(get_devices_count);
527 void class_obd_list(void)
532 read_lock(&obd_dev_lock);
533 for (i = 0; i < class_devno_max(); i++) {
534 struct obd_device *obd = class_num2obd(i);
538 if (obd->obd_stopping)
540 else if (obd->obd_set_up)
542 else if (obd->obd_attached)
546 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
547 i, status, obd->obd_type->typ_name,
548 obd->obd_name, obd->obd_uuid.uuid,
549 atomic_read(&obd->obd_refcount));
551 read_unlock(&obd_dev_lock);
555 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
556 specified, then only the client with that uuid is returned,
557 otherwise any client connected to the tgt is returned. */
558 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
559 const char * typ_name,
560 struct obd_uuid *grp_uuid)
564 read_lock(&obd_dev_lock);
565 for (i = 0; i < class_devno_max(); i++) {
566 struct obd_device *obd = class_num2obd(i);
570 if ((strncmp(obd->obd_type->typ_name, typ_name,
571 strlen(typ_name)) == 0)) {
572 if (obd_uuid_equals(tgt_uuid,
573 &obd->u.cli.cl_target_uuid) &&
574 ((grp_uuid)? obd_uuid_equals(grp_uuid,
575 &obd->obd_uuid) : 1)) {
576 read_unlock(&obd_dev_lock);
581 read_unlock(&obd_dev_lock);
585 EXPORT_SYMBOL(class_find_client_obd);
587 /* Iterate the obd_device list looking devices have grp_uuid. Start
588 searching at *next, and if a device is found, the next index to look
589 at is saved in *next. If next is NULL, then the first matching device
590 will always be returned. */
591 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
597 else if (*next >= 0 && *next < class_devno_max())
602 read_lock(&obd_dev_lock);
603 for (; i < class_devno_max(); i++) {
604 struct obd_device *obd = class_num2obd(i);
608 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
611 read_unlock(&obd_dev_lock);
615 read_unlock(&obd_dev_lock);
619 EXPORT_SYMBOL(class_devices_in_group);
622 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
623 * adjust sptlrpc settings accordingly.
625 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
627 struct obd_device *obd;
631 LASSERT(namelen > 0);
633 read_lock(&obd_dev_lock);
634 for (i = 0; i < class_devno_max(); i++) {
635 obd = class_num2obd(i);
637 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
640 /* only notify mdc, osc, osp, lwp, mdt, ost
641 * because only these have a -sptlrpc llog */
642 type = obd->obd_type->typ_name;
643 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
644 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
645 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
646 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
647 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
648 strcmp(type, LUSTRE_OST_NAME) != 0)
651 if (strncmp(obd->obd_name, fsname, namelen))
654 class_incref(obd, __FUNCTION__, obd);
655 read_unlock(&obd_dev_lock);
656 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
657 sizeof(KEY_SPTLRPC_CONF),
658 KEY_SPTLRPC_CONF, 0, NULL, NULL);
660 class_decref(obd, __FUNCTION__, obd);
661 read_lock(&obd_dev_lock);
663 read_unlock(&obd_dev_lock);
666 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
668 void obd_cleanup_caches(void)
671 if (obd_device_cachep) {
672 kmem_cache_destroy(obd_device_cachep);
673 obd_device_cachep = NULL;
676 kmem_cache_destroy(obdo_cachep);
680 kmem_cache_destroy(import_cachep);
681 import_cachep = NULL;
687 int obd_init_caches(void)
692 LASSERT(obd_device_cachep == NULL);
693 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
694 sizeof(struct obd_device),
696 if (!obd_device_cachep)
697 GOTO(out, rc = -ENOMEM);
699 LASSERT(obdo_cachep == NULL);
700 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
703 GOTO(out, rc = -ENOMEM);
705 LASSERT(import_cachep == NULL);
706 import_cachep = kmem_cache_create("ll_import_cache",
707 sizeof(struct obd_import),
710 GOTO(out, rc = -ENOMEM);
714 obd_cleanup_caches();
718 /* map connection to client */
719 struct obd_export *class_conn2export(struct lustre_handle *conn)
721 struct obd_export *export;
725 CDEBUG(D_CACHE, "looking for null handle\n");
729 if (conn->cookie == -1) { /* this means assign a new connection */
730 CDEBUG(D_CACHE, "want a new connection\n");
734 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
735 export = class_handle2object(conn->cookie, NULL);
738 EXPORT_SYMBOL(class_conn2export);
740 struct obd_device *class_exp2obd(struct obd_export *exp)
746 EXPORT_SYMBOL(class_exp2obd);
748 struct obd_device *class_conn2obd(struct lustre_handle *conn)
750 struct obd_export *export;
751 export = class_conn2export(conn);
753 struct obd_device *obd = export->exp_obd;
754 class_export_put(export);
760 struct obd_import *class_exp2cliimp(struct obd_export *exp)
762 struct obd_device *obd = exp->exp_obd;
765 return obd->u.cli.cl_import;
767 EXPORT_SYMBOL(class_exp2cliimp);
769 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
771 struct obd_device *obd = class_conn2obd(conn);
774 return obd->u.cli.cl_import;
777 /* Export management functions */
778 static void class_export_destroy(struct obd_export *exp)
780 struct obd_device *obd = exp->exp_obd;
783 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
784 LASSERT(obd != NULL);
786 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
787 exp->exp_client_uuid.uuid, obd->obd_name);
789 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
790 if (exp->exp_connection)
791 ptlrpc_put_connection_superhack(exp->exp_connection);
793 LASSERT(list_empty(&exp->exp_outstanding_replies));
794 LASSERT(list_empty(&exp->exp_uncommitted_replies));
795 LASSERT(list_empty(&exp->exp_req_replay_queue));
796 LASSERT(list_empty(&exp->exp_hp_rpcs));
797 obd_destroy_export(exp);
798 class_decref(obd, "export", exp);
800 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
804 static void export_handle_addref(void *export)
806 class_export_get(export);
809 static struct portals_handle_ops export_handle_ops = {
810 .hop_addref = export_handle_addref,
814 struct obd_export *class_export_get(struct obd_export *exp)
816 atomic_inc(&exp->exp_refcount);
817 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
818 atomic_read(&exp->exp_refcount));
821 EXPORT_SYMBOL(class_export_get);
823 void class_export_put(struct obd_export *exp)
825 LASSERT(exp != NULL);
826 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
827 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
828 atomic_read(&exp->exp_refcount) - 1);
830 if (atomic_dec_and_test(&exp->exp_refcount)) {
831 LASSERT(!list_empty(&exp->exp_obd_chain));
832 LASSERT(list_empty(&exp->exp_stale_list));
833 CDEBUG(D_IOCTL, "final put %p/%s\n",
834 exp, exp->exp_client_uuid.uuid);
836 /* release nid stat refererence */
837 lprocfs_exp_cleanup(exp);
839 obd_zombie_export_add(exp);
842 EXPORT_SYMBOL(class_export_put);
844 /* Creates a new export, adds it to the hash table, and returns a
845 * pointer to it. The refcount is 2: one for the hash reference, and
846 * one for the pointer returned by this function. */
847 struct obd_export *class_new_export(struct obd_device *obd,
848 struct obd_uuid *cluuid)
850 struct obd_export *export;
851 struct cfs_hash *hash = NULL;
855 OBD_ALLOC_PTR(export);
857 return ERR_PTR(-ENOMEM);
859 export->exp_conn_cnt = 0;
860 export->exp_lock_hash = NULL;
861 export->exp_flock_hash = NULL;
862 atomic_set(&export->exp_refcount, 2);
863 atomic_set(&export->exp_rpc_count, 0);
864 atomic_set(&export->exp_cb_count, 0);
865 atomic_set(&export->exp_locks_count, 0);
866 #if LUSTRE_TRACKS_LOCK_EXP_REFS
867 INIT_LIST_HEAD(&export->exp_locks_list);
868 spin_lock_init(&export->exp_locks_list_guard);
870 atomic_set(&export->exp_replay_count, 0);
871 export->exp_obd = obd;
872 INIT_LIST_HEAD(&export->exp_outstanding_replies);
873 spin_lock_init(&export->exp_uncommitted_replies_lock);
874 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
875 INIT_LIST_HEAD(&export->exp_req_replay_queue);
876 INIT_LIST_HEAD(&export->exp_handle.h_link);
877 INIT_LIST_HEAD(&export->exp_hp_rpcs);
878 INIT_LIST_HEAD(&export->exp_reg_rpcs);
879 class_handle_hash(&export->exp_handle, &export_handle_ops);
880 export->exp_last_request_time = cfs_time_current_sec();
881 spin_lock_init(&export->exp_lock);
882 spin_lock_init(&export->exp_rpc_lock);
883 INIT_HLIST_NODE(&export->exp_uuid_hash);
884 INIT_HLIST_NODE(&export->exp_nid_hash);
885 INIT_HLIST_NODE(&export->exp_gen_hash);
886 spin_lock_init(&export->exp_bl_list_lock);
887 INIT_LIST_HEAD(&export->exp_bl_list);
888 INIT_LIST_HEAD(&export->exp_stale_list);
890 export->exp_sp_peer = LUSTRE_SP_ANY;
891 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
892 export->exp_client_uuid = *cluuid;
893 obd_init_export(export);
895 spin_lock(&obd->obd_dev_lock);
896 /* shouldn't happen, but might race */
897 if (obd->obd_stopping)
898 GOTO(exit_unlock, rc = -ENODEV);
900 hash = cfs_hash_getref(obd->obd_uuid_hash);
902 GOTO(exit_unlock, rc = -ENODEV);
903 spin_unlock(&obd->obd_dev_lock);
905 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
906 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
908 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
909 obd->obd_name, cluuid->uuid, rc);
910 GOTO(exit_err, rc = -EALREADY);
914 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
915 spin_lock(&obd->obd_dev_lock);
916 if (obd->obd_stopping) {
917 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
918 GOTO(exit_unlock, rc = -ENODEV);
921 class_incref(obd, "export", export);
922 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
923 list_add_tail(&export->exp_obd_chain_timed,
924 &export->exp_obd->obd_exports_timed);
925 export->exp_obd->obd_num_exports++;
926 spin_unlock(&obd->obd_dev_lock);
927 cfs_hash_putref(hash);
931 spin_unlock(&obd->obd_dev_lock);
934 cfs_hash_putref(hash);
935 class_handle_unhash(&export->exp_handle);
936 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
937 obd_destroy_export(export);
938 OBD_FREE_PTR(export);
941 EXPORT_SYMBOL(class_new_export);
943 void class_unlink_export(struct obd_export *exp)
945 class_handle_unhash(&exp->exp_handle);
947 spin_lock(&exp->exp_obd->obd_dev_lock);
948 /* delete an uuid-export hashitem from hashtables */
949 if (!hlist_unhashed(&exp->exp_uuid_hash))
950 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
951 &exp->exp_client_uuid,
952 &exp->exp_uuid_hash);
954 #ifdef HAVE_SERVER_SUPPORT
955 if (!hlist_unhashed(&exp->exp_gen_hash)) {
956 struct tg_export_data *ted = &exp->exp_target_data;
957 struct cfs_hash *hash;
959 /* Because obd_gen_hash will not be released until
960 * class_cleanup(), so hash should never be NULL here */
961 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
962 LASSERT(hash != NULL);
963 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
965 cfs_hash_putref(hash);
967 #endif /* HAVE_SERVER_SUPPORT */
969 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
970 list_del_init(&exp->exp_obd_chain_timed);
971 exp->exp_obd->obd_num_exports--;
972 spin_unlock(&exp->exp_obd->obd_dev_lock);
973 atomic_inc(&obd_stale_export_num);
975 /* A reference is kept by obd_stale_exports list */
976 obd_stale_export_put(exp);
978 EXPORT_SYMBOL(class_unlink_export);
980 /* Import management functions */
981 static void class_import_destroy(struct obd_import *imp)
985 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
986 imp->imp_obd->obd_name);
988 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
990 ptlrpc_put_connection_superhack(imp->imp_connection);
992 while (!list_empty(&imp->imp_conn_list)) {
993 struct obd_import_conn *imp_conn;
995 imp_conn = list_entry(imp->imp_conn_list.next,
996 struct obd_import_conn, oic_item);
997 list_del_init(&imp_conn->oic_item);
998 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
999 OBD_FREE(imp_conn, sizeof(*imp_conn));
1002 LASSERT(imp->imp_sec == NULL);
1003 class_decref(imp->imp_obd, "import", imp);
1004 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
1008 static void import_handle_addref(void *import)
1010 class_import_get(import);
1013 static struct portals_handle_ops import_handle_ops = {
1014 .hop_addref = import_handle_addref,
1018 struct obd_import *class_import_get(struct obd_import *import)
1020 atomic_inc(&import->imp_refcount);
1021 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1022 atomic_read(&import->imp_refcount),
1023 import->imp_obd->obd_name);
1026 EXPORT_SYMBOL(class_import_get);
1028 void class_import_put(struct obd_import *imp)
1032 LASSERT(list_empty(&imp->imp_zombie_chain));
1033 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1035 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1036 atomic_read(&imp->imp_refcount) - 1,
1037 imp->imp_obd->obd_name);
1039 if (atomic_dec_and_test(&imp->imp_refcount)) {
1040 CDEBUG(D_INFO, "final put import %p\n", imp);
1041 obd_zombie_import_add(imp);
1044 /* catch possible import put race */
1045 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1048 EXPORT_SYMBOL(class_import_put);
1050 static void init_imp_at(struct imp_at *at) {
1052 at_init(&at->iat_net_latency, 0, 0);
1053 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1054 /* max service estimates are tracked on the server side, so
1055 don't use the AT history here, just use the last reported
1056 val. (But keep hist for proc histogram, worst_ever) */
1057 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1062 struct obd_import *class_new_import(struct obd_device *obd)
1064 struct obd_import *imp;
1065 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1067 OBD_ALLOC(imp, sizeof(*imp));
1071 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1072 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1073 INIT_LIST_HEAD(&imp->imp_replay_list);
1074 INIT_LIST_HEAD(&imp->imp_sending_list);
1075 INIT_LIST_HEAD(&imp->imp_delayed_list);
1076 INIT_LIST_HEAD(&imp->imp_committed_list);
1077 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1078 imp->imp_known_replied_xid = 0;
1079 imp->imp_replay_cursor = &imp->imp_committed_list;
1080 spin_lock_init(&imp->imp_lock);
1081 imp->imp_last_success_conn = 0;
1082 imp->imp_state = LUSTRE_IMP_NEW;
1083 imp->imp_obd = class_incref(obd, "import", imp);
1084 mutex_init(&imp->imp_sec_mutex);
1085 init_waitqueue_head(&imp->imp_recovery_waitq);
1087 if (curr_pid_ns->child_reaper)
1088 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1090 imp->imp_sec_refpid = 1;
1092 atomic_set(&imp->imp_refcount, 2);
1093 atomic_set(&imp->imp_unregistering, 0);
1094 atomic_set(&imp->imp_inflight, 0);
1095 atomic_set(&imp->imp_replay_inflight, 0);
1096 atomic_set(&imp->imp_inval_count, 0);
1097 INIT_LIST_HEAD(&imp->imp_conn_list);
1098 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1099 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1100 init_imp_at(&imp->imp_at);
1102 /* the default magic is V2, will be used in connect RPC, and
1103 * then adjusted according to the flags in request/reply. */
1104 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1108 EXPORT_SYMBOL(class_new_import);
1110 void class_destroy_import(struct obd_import *import)
1112 LASSERT(import != NULL);
1113 LASSERT(import != LP_POISON);
1115 class_handle_unhash(&import->imp_handle);
1117 spin_lock(&import->imp_lock);
1118 import->imp_generation++;
1119 spin_unlock(&import->imp_lock);
1120 class_import_put(import);
1122 EXPORT_SYMBOL(class_destroy_import);
1124 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1126 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1128 spin_lock(&exp->exp_locks_list_guard);
1130 LASSERT(lock->l_exp_refs_nr >= 0);
1132 if (lock->l_exp_refs_target != NULL &&
1133 lock->l_exp_refs_target != exp) {
1134 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1135 exp, lock, lock->l_exp_refs_target);
1137 if ((lock->l_exp_refs_nr ++) == 0) {
1138 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1139 lock->l_exp_refs_target = exp;
1141 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1142 lock, exp, lock->l_exp_refs_nr);
1143 spin_unlock(&exp->exp_locks_list_guard);
1145 EXPORT_SYMBOL(__class_export_add_lock_ref);
1147 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1149 spin_lock(&exp->exp_locks_list_guard);
1150 LASSERT(lock->l_exp_refs_nr > 0);
1151 if (lock->l_exp_refs_target != exp) {
1152 LCONSOLE_WARN("lock %p, "
1153 "mismatching export pointers: %p, %p\n",
1154 lock, lock->l_exp_refs_target, exp);
1156 if (-- lock->l_exp_refs_nr == 0) {
1157 list_del_init(&lock->l_exp_refs_link);
1158 lock->l_exp_refs_target = NULL;
1160 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1161 lock, exp, lock->l_exp_refs_nr);
1162 spin_unlock(&exp->exp_locks_list_guard);
1164 EXPORT_SYMBOL(__class_export_del_lock_ref);
1167 /* A connection defines an export context in which preallocation can
1168 be managed. This releases the export pointer reference, and returns
1169 the export handle, so the export refcount is 1 when this function
1171 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1172 struct obd_uuid *cluuid)
1174 struct obd_export *export;
1175 LASSERT(conn != NULL);
1176 LASSERT(obd != NULL);
1177 LASSERT(cluuid != NULL);
1180 export = class_new_export(obd, cluuid);
1182 RETURN(PTR_ERR(export));
1184 conn->cookie = export->exp_handle.h_cookie;
1185 class_export_put(export);
1187 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1188 cluuid->uuid, conn->cookie);
1191 EXPORT_SYMBOL(class_connect);
1193 /* if export is involved in recovery then clean up related things */
1194 static void class_export_recovery_cleanup(struct obd_export *exp)
1196 struct obd_device *obd = exp->exp_obd;
1198 spin_lock(&obd->obd_recovery_task_lock);
1199 if (obd->obd_recovering) {
1200 if (exp->exp_in_recovery) {
1201 spin_lock(&exp->exp_lock);
1202 exp->exp_in_recovery = 0;
1203 spin_unlock(&exp->exp_lock);
1204 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1205 atomic_dec(&obd->obd_connected_clients);
1208 /* if called during recovery then should update
1209 * obd_stale_clients counter,
1210 * lightweight exports are not counted */
1211 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1212 exp->exp_obd->obd_stale_clients++;
1214 spin_unlock(&obd->obd_recovery_task_lock);
1216 spin_lock(&exp->exp_lock);
1217 /** Cleanup req replay fields */
1218 if (exp->exp_req_replay_needed) {
1219 exp->exp_req_replay_needed = 0;
1221 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1222 atomic_dec(&obd->obd_req_replay_clients);
1225 /** Cleanup lock replay data */
1226 if (exp->exp_lock_replay_needed) {
1227 exp->exp_lock_replay_needed = 0;
1229 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1230 atomic_dec(&obd->obd_lock_replay_clients);
1232 spin_unlock(&exp->exp_lock);
1235 /* This function removes 1-3 references from the export:
1236 * 1 - for export pointer passed
1237 * and if disconnect really need
1238 * 2 - removing from hash
1239 * 3 - in client_unlink_export
1240 * The export pointer passed to this function can destroyed */
1241 int class_disconnect(struct obd_export *export)
1243 int already_disconnected;
1246 if (export == NULL) {
1247 CWARN("attempting to free NULL export %p\n", export);
1251 spin_lock(&export->exp_lock);
1252 already_disconnected = export->exp_disconnected;
1253 export->exp_disconnected = 1;
1254 /* We hold references of export for uuid hash
1255 * and nid_hash and export link at least. So
1256 * it is safe to call cfs_hash_del in there. */
1257 if (!hlist_unhashed(&export->exp_nid_hash))
1258 cfs_hash_del(export->exp_obd->obd_nid_hash,
1259 &export->exp_connection->c_peer.nid,
1260 &export->exp_nid_hash);
1261 spin_unlock(&export->exp_lock);
1263 /* class_cleanup(), abort_recovery(), and class_fail_export()
1264 * all end up in here, and if any of them race we shouldn't
1265 * call extra class_export_puts(). */
1266 if (already_disconnected) {
1267 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1268 GOTO(no_disconn, already_disconnected);
1271 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1272 export->exp_handle.h_cookie);
1274 class_export_recovery_cleanup(export);
1275 class_unlink_export(export);
1277 class_export_put(export);
1280 EXPORT_SYMBOL(class_disconnect);
1282 /* Return non-zero for a fully connected export */
1283 int class_connected_export(struct obd_export *exp)
1288 spin_lock(&exp->exp_lock);
1289 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1290 spin_unlock(&exp->exp_lock);
1294 EXPORT_SYMBOL(class_connected_export);
1296 static void class_disconnect_export_list(struct list_head *list,
1297 enum obd_option flags)
1300 struct obd_export *exp;
1303 /* It's possible that an export may disconnect itself, but
1304 * nothing else will be added to this list. */
1305 while (!list_empty(list)) {
1306 exp = list_entry(list->next, struct obd_export,
1308 /* need for safe call CDEBUG after obd_disconnect */
1309 class_export_get(exp);
1311 spin_lock(&exp->exp_lock);
1312 exp->exp_flags = flags;
1313 spin_unlock(&exp->exp_lock);
1315 if (obd_uuid_equals(&exp->exp_client_uuid,
1316 &exp->exp_obd->obd_uuid)) {
1318 "exp %p export uuid == obd uuid, don't discon\n",
1320 /* Need to delete this now so we don't end up pointing
1321 * to work_list later when this export is cleaned up. */
1322 list_del_init(&exp->exp_obd_chain);
1323 class_export_put(exp);
1327 class_export_get(exp);
1328 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1329 "last request at %ld\n",
1330 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1331 exp, exp->exp_last_request_time);
1332 /* release one export reference anyway */
1333 rc = obd_disconnect(exp);
1335 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1336 obd_export_nid2str(exp), exp, rc);
1337 class_export_put(exp);
1342 void class_disconnect_exports(struct obd_device *obd)
1344 struct list_head work_list;
1347 /* Move all of the exports from obd_exports to a work list, en masse. */
1348 INIT_LIST_HEAD(&work_list);
1349 spin_lock(&obd->obd_dev_lock);
1350 list_splice_init(&obd->obd_exports, &work_list);
1351 list_splice_init(&obd->obd_delayed_exports, &work_list);
1352 spin_unlock(&obd->obd_dev_lock);
1354 if (!list_empty(&work_list)) {
1355 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1356 "disconnecting them\n", obd->obd_minor, obd);
1357 class_disconnect_export_list(&work_list,
1358 exp_flags_from_obd(obd));
1360 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1361 obd->obd_minor, obd);
1364 EXPORT_SYMBOL(class_disconnect_exports);
1366 /* Remove exports that have not completed recovery.
1368 void class_disconnect_stale_exports(struct obd_device *obd,
1369 int (*test_export)(struct obd_export *))
1371 struct list_head work_list;
1372 struct obd_export *exp, *n;
1376 INIT_LIST_HEAD(&work_list);
1377 spin_lock(&obd->obd_dev_lock);
1378 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1380 /* don't count self-export as client */
1381 if (obd_uuid_equals(&exp->exp_client_uuid,
1382 &exp->exp_obd->obd_uuid))
1385 /* don't evict clients which have no slot in last_rcvd
1386 * (e.g. lightweight connection) */
1387 if (exp->exp_target_data.ted_lr_idx == -1)
1390 spin_lock(&exp->exp_lock);
1391 if (exp->exp_failed || test_export(exp)) {
1392 spin_unlock(&exp->exp_lock);
1395 exp->exp_failed = 1;
1396 spin_unlock(&exp->exp_lock);
1398 list_move(&exp->exp_obd_chain, &work_list);
1400 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1401 obd->obd_name, exp->exp_client_uuid.uuid,
1402 exp->exp_connection == NULL ? "<unknown>" :
1403 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1404 print_export_data(exp, "EVICTING", 0, D_HA);
1406 spin_unlock(&obd->obd_dev_lock);
1409 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1410 obd->obd_name, evicted);
1412 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1413 OBD_OPT_ABORT_RECOV);
1416 EXPORT_SYMBOL(class_disconnect_stale_exports);
1418 void class_fail_export(struct obd_export *exp)
1420 int rc, already_failed;
1422 spin_lock(&exp->exp_lock);
1423 already_failed = exp->exp_failed;
1424 exp->exp_failed = 1;
1425 spin_unlock(&exp->exp_lock);
1427 if (already_failed) {
1428 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1429 exp, exp->exp_client_uuid.uuid);
1433 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1434 exp, exp->exp_client_uuid.uuid);
1436 if (obd_dump_on_timeout)
1437 libcfs_debug_dumplog();
1439 /* need for safe call CDEBUG after obd_disconnect */
1440 class_export_get(exp);
1442 /* Most callers into obd_disconnect are removing their own reference
1443 * (request, for example) in addition to the one from the hash table.
1444 * We don't have such a reference here, so make one. */
1445 class_export_get(exp);
1446 rc = obd_disconnect(exp);
1448 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1450 CDEBUG(D_HA, "disconnected export %p/%s\n",
1451 exp, exp->exp_client_uuid.uuid);
1452 class_export_put(exp);
1454 EXPORT_SYMBOL(class_fail_export);
1456 char *obd_export_nid2str(struct obd_export *exp)
1458 if (exp->exp_connection != NULL)
1459 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1463 EXPORT_SYMBOL(obd_export_nid2str);
1465 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1467 struct cfs_hash *nid_hash;
1468 struct obd_export *doomed_exp = NULL;
1469 int exports_evicted = 0;
1471 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1473 spin_lock(&obd->obd_dev_lock);
1474 /* umount has run already, so evict thread should leave
1475 * its task to umount thread now */
1476 if (obd->obd_stopping) {
1477 spin_unlock(&obd->obd_dev_lock);
1478 return exports_evicted;
1480 nid_hash = obd->obd_nid_hash;
1481 cfs_hash_getref(nid_hash);
1482 spin_unlock(&obd->obd_dev_lock);
1485 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1486 if (doomed_exp == NULL)
1489 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1490 "nid %s found, wanted nid %s, requested nid %s\n",
1491 obd_export_nid2str(doomed_exp),
1492 libcfs_nid2str(nid_key), nid);
1493 LASSERTF(doomed_exp != obd->obd_self_export,
1494 "self-export is hashed by NID?\n");
1496 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1497 "request\n", obd->obd_name,
1498 obd_uuid2str(&doomed_exp->exp_client_uuid),
1499 obd_export_nid2str(doomed_exp));
1500 class_fail_export(doomed_exp);
1501 class_export_put(doomed_exp);
1504 cfs_hash_putref(nid_hash);
1506 if (!exports_evicted)
1507 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1508 obd->obd_name, nid);
1509 return exports_evicted;
1511 EXPORT_SYMBOL(obd_export_evict_by_nid);
1513 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1515 struct cfs_hash *uuid_hash;
1516 struct obd_export *doomed_exp = NULL;
1517 struct obd_uuid doomed_uuid;
1518 int exports_evicted = 0;
1520 spin_lock(&obd->obd_dev_lock);
1521 if (obd->obd_stopping) {
1522 spin_unlock(&obd->obd_dev_lock);
1523 return exports_evicted;
1525 uuid_hash = obd->obd_uuid_hash;
1526 cfs_hash_getref(uuid_hash);
1527 spin_unlock(&obd->obd_dev_lock);
1529 obd_str2uuid(&doomed_uuid, uuid);
1530 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1531 CERROR("%s: can't evict myself\n", obd->obd_name);
1532 cfs_hash_putref(uuid_hash);
1533 return exports_evicted;
1536 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1538 if (doomed_exp == NULL) {
1539 CERROR("%s: can't disconnect %s: no exports found\n",
1540 obd->obd_name, uuid);
1542 CWARN("%s: evicting %s at adminstrative request\n",
1543 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1544 class_fail_export(doomed_exp);
1545 class_export_put(doomed_exp);
1548 cfs_hash_putref(uuid_hash);
1550 return exports_evicted;
1553 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1554 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1555 EXPORT_SYMBOL(class_export_dump_hook);
1558 static void print_export_data(struct obd_export *exp, const char *status,
1559 int locks, int debug_level)
1561 struct ptlrpc_reply_state *rs;
1562 struct ptlrpc_reply_state *first_reply = NULL;
1565 spin_lock(&exp->exp_lock);
1566 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1572 spin_unlock(&exp->exp_lock);
1574 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1575 "%p %s %llu stale:%d\n",
1576 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1577 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1578 atomic_read(&exp->exp_rpc_count),
1579 atomic_read(&exp->exp_cb_count),
1580 atomic_read(&exp->exp_locks_count),
1581 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1582 nreplies, first_reply, nreplies > 3 ? "..." : "",
1583 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1584 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1585 if (locks && class_export_dump_hook != NULL)
1586 class_export_dump_hook(exp);
1590 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1592 struct obd_export *exp;
1594 spin_lock(&obd->obd_dev_lock);
1595 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1596 print_export_data(exp, "ACTIVE", locks, debug_level);
1597 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1598 print_export_data(exp, "UNLINKED", locks, debug_level);
1599 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1600 print_export_data(exp, "DELAYED", locks, debug_level);
1601 spin_unlock(&obd->obd_dev_lock);
1602 spin_lock(&obd_zombie_impexp_lock);
1603 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1604 print_export_data(exp, "ZOMBIE", locks, debug_level);
1605 spin_unlock(&obd_zombie_impexp_lock);
1608 void obd_exports_barrier(struct obd_device *obd)
1611 LASSERT(list_empty(&obd->obd_exports));
1612 spin_lock(&obd->obd_dev_lock);
1613 while (!list_empty(&obd->obd_unlinked_exports)) {
1614 spin_unlock(&obd->obd_dev_lock);
1615 set_current_state(TASK_UNINTERRUPTIBLE);
1616 schedule_timeout(cfs_time_seconds(waited));
1617 if (waited > 5 && is_power_of_2(waited)) {
1618 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1619 "more than %d seconds. "
1620 "The obd refcount = %d. Is it stuck?\n",
1621 obd->obd_name, waited,
1622 atomic_read(&obd->obd_refcount));
1623 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1626 spin_lock(&obd->obd_dev_lock);
1628 spin_unlock(&obd->obd_dev_lock);
1630 EXPORT_SYMBOL(obd_exports_barrier);
1632 /* Total amount of zombies to be destroyed */
1633 static int zombies_count = 0;
1636 * kill zombie imports and exports
1638 void obd_zombie_impexp_cull(void)
1640 struct obd_import *import;
1641 struct obd_export *export;
1645 spin_lock(&obd_zombie_impexp_lock);
1648 if (!list_empty(&obd_zombie_imports)) {
1649 import = list_entry(obd_zombie_imports.next,
1652 list_del_init(&import->imp_zombie_chain);
1656 if (!list_empty(&obd_zombie_exports)) {
1657 export = list_entry(obd_zombie_exports.next,
1660 list_del_init(&export->exp_obd_chain);
1663 spin_unlock(&obd_zombie_impexp_lock);
1665 if (import != NULL) {
1666 class_import_destroy(import);
1667 spin_lock(&obd_zombie_impexp_lock);
1669 spin_unlock(&obd_zombie_impexp_lock);
1672 if (export != NULL) {
1673 class_export_destroy(export);
1674 spin_lock(&obd_zombie_impexp_lock);
1676 spin_unlock(&obd_zombie_impexp_lock);
1680 } while (import != NULL || export != NULL);
1684 static DECLARE_COMPLETION(obd_zombie_start);
1685 static DECLARE_COMPLETION(obd_zombie_stop);
1686 static unsigned long obd_zombie_flags;
1687 static DECLARE_WAIT_QUEUE_HEAD(obd_zombie_waitq);
1688 static pid_t obd_zombie_pid;
1691 OBD_ZOMBIE_STOP = 0x0001,
1695 * check for work for kill zombie import/export thread.
1697 static int obd_zombie_impexp_check(void *arg)
1701 spin_lock(&obd_zombie_impexp_lock);
1702 rc = (zombies_count == 0) &&
1703 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1704 spin_unlock(&obd_zombie_impexp_lock);
1710 * Add export to the obd_zombe thread and notify it.
1712 static void obd_zombie_export_add(struct obd_export *exp) {
1713 atomic_dec(&obd_stale_export_num);
1714 spin_lock(&exp->exp_obd->obd_dev_lock);
1715 LASSERT(!list_empty(&exp->exp_obd_chain));
1716 list_del_init(&exp->exp_obd_chain);
1717 spin_unlock(&exp->exp_obd->obd_dev_lock);
1718 spin_lock(&obd_zombie_impexp_lock);
1720 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1721 spin_unlock(&obd_zombie_impexp_lock);
1723 obd_zombie_impexp_notify();
1727 * Add import to the obd_zombe thread and notify it.
1729 static void obd_zombie_import_add(struct obd_import *imp) {
1730 LASSERT(imp->imp_sec == NULL);
1731 spin_lock(&obd_zombie_impexp_lock);
1732 LASSERT(list_empty(&imp->imp_zombie_chain));
1734 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1735 spin_unlock(&obd_zombie_impexp_lock);
1737 obd_zombie_impexp_notify();
1741 * notify import/export destroy thread about new zombie.
1743 static void obd_zombie_impexp_notify(void)
1746 * Make sure obd_zomebie_impexp_thread get this notification.
1747 * It is possible this signal only get by obd_zombie_barrier, and
1748 * barrier gulps this notification and sleeps away and hangs ensues
1750 wake_up_all(&obd_zombie_waitq);
1754 * check whether obd_zombie is idle
1756 static int obd_zombie_is_idle(void)
1760 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1761 spin_lock(&obd_zombie_impexp_lock);
1762 rc = (zombies_count == 0);
1763 spin_unlock(&obd_zombie_impexp_lock);
1768 * wait when obd_zombie import/export queues become empty
1770 void obd_zombie_barrier(void)
1772 struct l_wait_info lwi = { 0 };
1774 if (obd_zombie_pid == current_pid())
1775 /* don't wait for myself */
1777 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1779 EXPORT_SYMBOL(obd_zombie_barrier);
1782 struct obd_export *obd_stale_export_get(void)
1784 struct obd_export *exp = NULL;
1787 spin_lock(&obd_stale_export_lock);
1788 if (!list_empty(&obd_stale_exports)) {
1789 exp = list_entry(obd_stale_exports.next,
1790 struct obd_export, exp_stale_list);
1791 list_del_init(&exp->exp_stale_list);
1793 spin_unlock(&obd_stale_export_lock);
1796 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1797 atomic_read(&obd_stale_export_num));
1801 EXPORT_SYMBOL(obd_stale_export_get);
1803 void obd_stale_export_put(struct obd_export *exp)
1807 LASSERT(list_empty(&exp->exp_stale_list));
1808 if (exp->exp_lock_hash &&
1809 atomic_read(&exp->exp_lock_hash->hs_count)) {
1810 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1811 atomic_read(&obd_stale_export_num));
1813 spin_lock_bh(&exp->exp_bl_list_lock);
1814 spin_lock(&obd_stale_export_lock);
1815 /* Add to the tail if there is no blocked locks,
1816 * to the head otherwise. */
1817 if (list_empty(&exp->exp_bl_list))
1818 list_add_tail(&exp->exp_stale_list,
1819 &obd_stale_exports);
1821 list_add(&exp->exp_stale_list,
1822 &obd_stale_exports);
1824 spin_unlock(&obd_stale_export_lock);
1825 spin_unlock_bh(&exp->exp_bl_list_lock);
1827 class_export_put(exp);
1831 EXPORT_SYMBOL(obd_stale_export_put);
1834 * Adjust the position of the export in the stale list,
1835 * i.e. move to the head of the list if is needed.
1837 void obd_stale_export_adjust(struct obd_export *exp)
1839 LASSERT(exp != NULL);
1840 spin_lock_bh(&exp->exp_bl_list_lock);
1841 spin_lock(&obd_stale_export_lock);
1843 if (!list_empty(&exp->exp_stale_list) &&
1844 !list_empty(&exp->exp_bl_list))
1845 list_move(&exp->exp_stale_list, &obd_stale_exports);
1847 spin_unlock(&obd_stale_export_lock);
1848 spin_unlock_bh(&exp->exp_bl_list_lock);
1850 EXPORT_SYMBOL(obd_stale_export_adjust);
1853 * destroy zombie export/import thread.
1855 static int obd_zombie_impexp_thread(void *unused)
1857 unshare_fs_struct();
1858 complete(&obd_zombie_start);
1860 obd_zombie_pid = current_pid();
1862 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1863 struct l_wait_info lwi = { 0 };
1865 l_wait_event(obd_zombie_waitq,
1866 !obd_zombie_impexp_check(NULL), &lwi);
1867 obd_zombie_impexp_cull();
1870 * Notify obd_zombie_barrier callers that queues
1873 wake_up(&obd_zombie_waitq);
1876 complete(&obd_zombie_stop);
1883 * start destroy zombie import/export thread
1885 int obd_zombie_impexp_init(void)
1887 struct task_struct *task;
1889 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1891 RETURN(PTR_ERR(task));
1893 wait_for_completion(&obd_zombie_start);
1897 * stop destroy zombie import/export thread
1899 void obd_zombie_impexp_stop(void)
1901 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1902 obd_zombie_impexp_notify();
1903 wait_for_completion(&obd_zombie_stop);
1904 LASSERT(list_empty(&obd_stale_exports));
1907 /***** Kernel-userspace comm helpers *******/
1909 /* Get length of entire message, including header */
1910 int kuc_len(int payload_len)
1912 return sizeof(struct kuc_hdr) + payload_len;
1914 EXPORT_SYMBOL(kuc_len);
1916 /* Get a pointer to kuc header, given a ptr to the payload
1917 * @param p Pointer to payload area
1918 * @returns Pointer to kuc header
1920 struct kuc_hdr * kuc_ptr(void *p)
1922 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1923 LASSERT(lh->kuc_magic == KUC_MAGIC);
1926 EXPORT_SYMBOL(kuc_ptr);
1928 /* Alloc space for a message, and fill in header
1929 * @return Pointer to payload area
1931 void *kuc_alloc(int payload_len, int transport, int type)
1934 int len = kuc_len(payload_len);
1938 return ERR_PTR(-ENOMEM);
1940 lh->kuc_magic = KUC_MAGIC;
1941 lh->kuc_transport = transport;
1942 lh->kuc_msgtype = type;
1943 lh->kuc_msglen = len;
1945 return (void *)(lh + 1);
1947 EXPORT_SYMBOL(kuc_alloc);
1949 /* Takes pointer to payload area */
1950 void kuc_free(void *p, int payload_len)
1952 struct kuc_hdr *lh = kuc_ptr(p);
1953 OBD_FREE(lh, kuc_len(payload_len));
1955 EXPORT_SYMBOL(kuc_free);
1957 struct obd_request_slot_waiter {
1958 struct list_head orsw_entry;
1959 wait_queue_head_t orsw_waitq;
1963 static bool obd_request_slot_avail(struct client_obd *cli,
1964 struct obd_request_slot_waiter *orsw)
1968 spin_lock(&cli->cl_loi_list_lock);
1969 avail = !!list_empty(&orsw->orsw_entry);
1970 spin_unlock(&cli->cl_loi_list_lock);
1976 * For network flow control, the RPC sponsor needs to acquire a credit
1977 * before sending the RPC. The credits count for a connection is defined
1978 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1979 * the subsequent RPC sponsors need to wait until others released their
1980 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1982 int obd_get_request_slot(struct client_obd *cli)
1984 struct obd_request_slot_waiter orsw;
1985 struct l_wait_info lwi;
1988 spin_lock(&cli->cl_loi_list_lock);
1989 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1990 cli->cl_r_in_flight++;
1991 spin_unlock(&cli->cl_loi_list_lock);
1995 init_waitqueue_head(&orsw.orsw_waitq);
1996 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1997 orsw.orsw_signaled = false;
1998 spin_unlock(&cli->cl_loi_list_lock);
2000 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2001 rc = l_wait_event(orsw.orsw_waitq,
2002 obd_request_slot_avail(cli, &orsw) ||
2006 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2007 * freed but other (such as obd_put_request_slot) is using it. */
2008 spin_lock(&cli->cl_loi_list_lock);
2010 if (!orsw.orsw_signaled) {
2011 if (list_empty(&orsw.orsw_entry))
2012 cli->cl_r_in_flight--;
2014 list_del(&orsw.orsw_entry);
2018 if (orsw.orsw_signaled) {
2019 LASSERT(list_empty(&orsw.orsw_entry));
2023 spin_unlock(&cli->cl_loi_list_lock);
2027 EXPORT_SYMBOL(obd_get_request_slot);
2029 void obd_put_request_slot(struct client_obd *cli)
2031 struct obd_request_slot_waiter *orsw;
2033 spin_lock(&cli->cl_loi_list_lock);
2034 cli->cl_r_in_flight--;
2036 /* If there is free slot, wakeup the first waiter. */
2037 if (!list_empty(&cli->cl_loi_read_list) &&
2038 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2039 orsw = list_entry(cli->cl_loi_read_list.next,
2040 struct obd_request_slot_waiter, orsw_entry);
2041 list_del_init(&orsw->orsw_entry);
2042 cli->cl_r_in_flight++;
2043 wake_up(&orsw->orsw_waitq);
2045 spin_unlock(&cli->cl_loi_list_lock);
2047 EXPORT_SYMBOL(obd_put_request_slot);
2049 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2051 return cli->cl_max_rpcs_in_flight;
2053 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2055 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2057 struct obd_request_slot_waiter *orsw;
2064 if (max > OBD_MAX_RIF_MAX || max < 1)
2067 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2068 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2069 /* adjust max_mod_rpcs_in_flight to ensure it is always
2070 * strictly lower that max_rpcs_in_flight */
2072 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2073 "because it must be higher than "
2074 "max_mod_rpcs_in_flight value",
2075 cli->cl_import->imp_obd->obd_name);
2078 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2079 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2085 spin_lock(&cli->cl_loi_list_lock);
2086 old = cli->cl_max_rpcs_in_flight;
2087 cli->cl_max_rpcs_in_flight = max;
2090 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2091 for (i = 0; i < diff; i++) {
2092 if (list_empty(&cli->cl_loi_read_list))
2095 orsw = list_entry(cli->cl_loi_read_list.next,
2096 struct obd_request_slot_waiter, orsw_entry);
2097 list_del_init(&orsw->orsw_entry);
2098 cli->cl_r_in_flight++;
2099 wake_up(&orsw->orsw_waitq);
2101 spin_unlock(&cli->cl_loi_list_lock);
2105 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2107 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2109 return cli->cl_max_mod_rpcs_in_flight;
2111 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2113 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2115 struct obd_connect_data *ocd;
2119 if (max > OBD_MAX_RIF_MAX || max < 1)
2122 /* cannot exceed or equal max_rpcs_in_flight */
2123 if (max >= cli->cl_max_rpcs_in_flight) {
2124 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2125 "higher or equal to max_rpcs_in_flight value (%u)\n",
2126 cli->cl_import->imp_obd->obd_name,
2127 max, cli->cl_max_rpcs_in_flight);
2131 /* cannot exceed max modify RPCs in flight supported by the server */
2132 ocd = &cli->cl_import->imp_connect_data;
2133 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2134 maxmodrpcs = ocd->ocd_maxmodrpcs;
2137 if (max > maxmodrpcs) {
2138 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2139 "higher than max_mod_rpcs_per_client value (%hu) "
2140 "returned by the server at connection\n",
2141 cli->cl_import->imp_obd->obd_name,
2146 spin_lock(&cli->cl_mod_rpcs_lock);
2148 prev = cli->cl_max_mod_rpcs_in_flight;
2149 cli->cl_max_mod_rpcs_in_flight = max;
2151 /* wakeup waiters if limit has been increased */
2152 if (cli->cl_max_mod_rpcs_in_flight > prev)
2153 wake_up(&cli->cl_mod_rpcs_waitq);
2155 spin_unlock(&cli->cl_mod_rpcs_lock);
2159 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2162 #define pct(a, b) (b ? a * 100 / b : 0)
2163 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2164 struct seq_file *seq)
2166 unsigned long mod_tot = 0, mod_cum;
2167 struct timespec64 now;
2170 ktime_get_real_ts64(&now);
2172 spin_lock(&cli->cl_mod_rpcs_lock);
2174 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2175 (s64)now.tv_sec, now.tv_nsec);
2176 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2177 cli->cl_mod_rpcs_in_flight);
2179 seq_printf(seq, "\n\t\t\tmodify\n");
2180 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2182 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2185 for (i = 0; i < OBD_HIST_MAX; i++) {
2186 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2188 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2189 i, mod, pct(mod, mod_tot),
2190 pct(mod_cum, mod_tot));
2191 if (mod_cum == mod_tot)
2195 spin_unlock(&cli->cl_mod_rpcs_lock);
2199 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2203 /* The number of modify RPCs sent in parallel is limited
2204 * because the server has a finite number of slots per client to
2205 * store request result and ensure reply reconstruction when needed.
2206 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2207 * that takes into account server limit and cl_max_rpcs_in_flight
2209 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2210 * one close request is allowed above the maximum.
2212 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2217 /* A slot is available if
2218 * - number of modify RPCs in flight is less than the max
2219 * - it's a close RPC and no other close request is in flight
2221 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2222 (close_req && cli->cl_close_rpcs_in_flight == 0);
2227 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2232 spin_lock(&cli->cl_mod_rpcs_lock);
2233 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2234 spin_unlock(&cli->cl_mod_rpcs_lock);
2238 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2241 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2242 it->it_op == IT_READDIR ||
2243 (it->it_op == IT_LAYOUT && !(it->it_flags & FMODE_WRITE))))
2248 /* Get a modify RPC slot from the obd client @cli according
2249 * to the kind of operation @opc that is going to be sent
2250 * and the intent @it of the operation if it applies.
2251 * If the maximum number of modify RPCs in flight is reached
2252 * the thread is put to sleep.
2253 * Returns the tag to be set in the request message. Tag 0
2254 * is reserved for non-modifying requests.
2256 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2257 struct lookup_intent *it)
2259 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2260 bool close_req = false;
2263 /* read-only metadata RPCs don't consume a slot on MDT
2264 * for reply reconstruction
2266 if (obd_skip_mod_rpc_slot(it))
2269 if (opc == MDS_CLOSE)
2273 spin_lock(&cli->cl_mod_rpcs_lock);
2274 max = cli->cl_max_mod_rpcs_in_flight;
2275 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2276 /* there is a slot available */
2277 cli->cl_mod_rpcs_in_flight++;
2279 cli->cl_close_rpcs_in_flight++;
2280 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2281 cli->cl_mod_rpcs_in_flight);
2282 /* find a free tag */
2283 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2285 LASSERT(i < OBD_MAX_RIF_MAX);
2286 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2287 spin_unlock(&cli->cl_mod_rpcs_lock);
2288 /* tag 0 is reserved for non-modify RPCs */
2291 spin_unlock(&cli->cl_mod_rpcs_lock);
2293 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2294 "opc %u, max %hu\n",
2295 cli->cl_import->imp_obd->obd_name, opc, max);
2297 l_wait_event(cli->cl_mod_rpcs_waitq,
2298 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2301 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2303 /* Put a modify RPC slot from the obd client @cli according
2304 * to the kind of operation @opc that has been sent and the
2305 * intent @it of the operation if it applies.
2307 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2308 struct lookup_intent *it, __u16 tag)
2310 bool close_req = false;
2312 if (obd_skip_mod_rpc_slot(it))
2315 if (opc == MDS_CLOSE)
2318 spin_lock(&cli->cl_mod_rpcs_lock);
2319 cli->cl_mod_rpcs_in_flight--;
2321 cli->cl_close_rpcs_in_flight--;
2322 /* release the tag in the bitmap */
2323 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2324 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2325 spin_unlock(&cli->cl_mod_rpcs_lock);
2326 wake_up(&cli->cl_mod_rpcs_waitq);
2328 EXPORT_SYMBOL(obd_put_mod_rpc_slot);