4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lprocfs_status.h>
44 #include <lustre_disk.h>
45 #include <lustre_kernelcomm.h>
47 spinlock_t obd_types_lock;
49 static struct kmem_cache *obd_device_cachep;
50 struct kmem_cache *obdo_cachep;
51 EXPORT_SYMBOL(obdo_cachep);
52 static struct kmem_cache *import_cachep;
54 static struct list_head obd_zombie_imports;
55 static struct list_head obd_zombie_exports;
56 static spinlock_t obd_zombie_impexp_lock;
58 static void obd_zombie_impexp_notify(void);
59 static void obd_zombie_export_add(struct obd_export *exp);
60 static void obd_zombie_import_add(struct obd_import *imp);
61 static void print_export_data(struct obd_export *exp,
62 const char *status, int locks, int debug_level);
64 struct list_head obd_stale_exports;
65 spinlock_t obd_stale_export_lock;
66 atomic_t obd_stale_export_num;
68 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
69 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
72 * support functions: we could use inter-module communication, but this
73 * is more portable to other OS's
75 static struct obd_device *obd_device_alloc(void)
77 struct obd_device *obd;
79 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
81 obd->obd_magic = OBD_DEVICE_MAGIC;
86 static void obd_device_free(struct obd_device *obd)
89 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
90 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
91 if (obd->obd_namespace != NULL) {
92 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
93 obd, obd->obd_namespace, obd->obd_force);
96 lu_ref_fini(&obd->obd_reference);
97 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
100 struct obd_type *class_search_type(const char *name)
102 struct list_head *tmp;
103 struct obd_type *type;
105 spin_lock(&obd_types_lock);
106 list_for_each(tmp, &obd_types) {
107 type = list_entry(tmp, struct obd_type, typ_chain);
108 if (strcmp(type->typ_name, name) == 0) {
109 spin_unlock(&obd_types_lock);
113 spin_unlock(&obd_types_lock);
116 EXPORT_SYMBOL(class_search_type);
118 struct obd_type *class_get_type(const char *name)
120 struct obd_type *type = class_search_type(name);
122 #ifdef HAVE_MODULE_LOADING_SUPPORT
124 const char *modname = name;
126 if (strcmp(modname, "obdfilter") == 0)
129 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
130 modname = LUSTRE_OSP_NAME;
132 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
133 modname = LUSTRE_MDT_NAME;
135 if (!request_module("%s", modname)) {
136 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
137 type = class_search_type(name);
139 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
145 spin_lock(&type->obd_type_lock);
147 try_module_get(type->typ_dt_ops->o_owner);
148 spin_unlock(&type->obd_type_lock);
153 void class_put_type(struct obd_type *type)
156 spin_lock(&type->obd_type_lock);
158 module_put(type->typ_dt_ops->o_owner);
159 spin_unlock(&type->obd_type_lock);
162 #define CLASS_MAX_NAME 1024
164 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
165 bool enable_proc, struct lprocfs_vars *vars,
166 const char *name, struct lu_device_type *ldt)
168 struct obd_type *type;
173 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
175 if (class_search_type(name)) {
176 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
181 OBD_ALLOC(type, sizeof(*type));
185 OBD_ALLOC_PTR(type->typ_dt_ops);
186 OBD_ALLOC_PTR(type->typ_md_ops);
187 OBD_ALLOC(type->typ_name, strlen(name) + 1);
189 if (type->typ_dt_ops == NULL ||
190 type->typ_md_ops == NULL ||
191 type->typ_name == NULL)
194 *(type->typ_dt_ops) = *dt_ops;
195 /* md_ops is optional */
197 *(type->typ_md_ops) = *md_ops;
198 strcpy(type->typ_name, name);
199 spin_lock_init(&type->obd_type_lock);
201 #ifdef CONFIG_PROC_FS
203 type->typ_procroot = lprocfs_register(type->typ_name,
206 if (IS_ERR(type->typ_procroot)) {
207 rc = PTR_ERR(type->typ_procroot);
208 type->typ_procroot = NULL;
215 rc = lu_device_type_init(ldt);
220 spin_lock(&obd_types_lock);
221 list_add(&type->typ_chain, &obd_types);
222 spin_unlock(&obd_types_lock);
227 if (type->typ_name != NULL) {
228 #ifdef CONFIG_PROC_FS
229 if (type->typ_procroot != NULL)
230 remove_proc_subtree(type->typ_name, proc_lustre_root);
232 OBD_FREE(type->typ_name, strlen(name) + 1);
234 if (type->typ_md_ops != NULL)
235 OBD_FREE_PTR(type->typ_md_ops);
236 if (type->typ_dt_ops != NULL)
237 OBD_FREE_PTR(type->typ_dt_ops);
238 OBD_FREE(type, sizeof(*type));
241 EXPORT_SYMBOL(class_register_type);
243 int class_unregister_type(const char *name)
245 struct obd_type *type = class_search_type(name);
249 CERROR("unknown obd type\n");
253 if (type->typ_refcnt) {
254 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
255 /* This is a bad situation, let's make the best of it */
256 /* Remove ops, but leave the name for debugging */
257 OBD_FREE_PTR(type->typ_dt_ops);
258 OBD_FREE_PTR(type->typ_md_ops);
262 /* we do not use type->typ_procroot as for compatibility purposes
263 * other modules can share names (i.e. lod can use lov entry). so
264 * we can't reference pointer as it can get invalided when another
265 * module removes the entry */
266 #ifdef CONFIG_PROC_FS
267 if (type->typ_procroot != NULL)
268 remove_proc_subtree(type->typ_name, proc_lustre_root);
269 if (type->typ_procsym != NULL)
270 lprocfs_remove(&type->typ_procsym);
273 lu_device_type_fini(type->typ_lu);
275 spin_lock(&obd_types_lock);
276 list_del(&type->typ_chain);
277 spin_unlock(&obd_types_lock);
278 OBD_FREE(type->typ_name, strlen(name) + 1);
279 if (type->typ_dt_ops != NULL)
280 OBD_FREE_PTR(type->typ_dt_ops);
281 if (type->typ_md_ops != NULL)
282 OBD_FREE_PTR(type->typ_md_ops);
283 OBD_FREE(type, sizeof(*type));
285 } /* class_unregister_type */
286 EXPORT_SYMBOL(class_unregister_type);
289 * Create a new obd device.
291 * Find an empty slot in ::obd_devs[], create a new obd device in it.
293 * \param[in] type_name obd device type string.
294 * \param[in] name obd device name.
296 * \retval NULL if create fails, otherwise return the obd device
299 struct obd_device *class_newdev(const char *type_name, const char *name)
301 struct obd_device *result = NULL;
302 struct obd_device *newdev;
303 struct obd_type *type = NULL;
305 int new_obd_minor = 0;
308 if (strlen(name) >= MAX_OBD_NAME) {
309 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
310 RETURN(ERR_PTR(-EINVAL));
313 type = class_get_type(type_name);
315 CERROR("OBD: unknown type: %s\n", type_name);
316 RETURN(ERR_PTR(-ENODEV));
319 newdev = obd_device_alloc();
321 GOTO(out_type, result = ERR_PTR(-ENOMEM));
323 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
325 write_lock(&obd_dev_lock);
326 for (i = 0; i < class_devno_max(); i++) {
327 struct obd_device *obd = class_num2obd(i);
329 if (obd && (strcmp(name, obd->obd_name) == 0)) {
330 CERROR("Device %s already exists at %d, won't add\n",
333 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
334 "%p obd_magic %08x != %08x\n", result,
335 result->obd_magic, OBD_DEVICE_MAGIC);
336 LASSERTF(result->obd_minor == new_obd_minor,
337 "%p obd_minor %d != %d\n", result,
338 result->obd_minor, new_obd_minor);
340 obd_devs[result->obd_minor] = NULL;
341 result->obd_name[0]='\0';
343 result = ERR_PTR(-EEXIST);
346 if (!result && !obd) {
348 result->obd_minor = i;
350 result->obd_type = type;
351 strncpy(result->obd_name, name,
352 sizeof(result->obd_name) - 1);
353 obd_devs[i] = result;
356 write_unlock(&obd_dev_lock);
358 if (result == NULL && i >= class_devno_max()) {
359 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
361 GOTO(out, result = ERR_PTR(-EOVERFLOW));
367 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
368 result->obd_name, result);
372 obd_device_free(newdev);
374 class_put_type(type);
378 void class_release_dev(struct obd_device *obd)
380 struct obd_type *obd_type = obd->obd_type;
382 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
383 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
384 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
385 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
386 LASSERT(obd_type != NULL);
388 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
389 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
391 write_lock(&obd_dev_lock);
392 obd_devs[obd->obd_minor] = NULL;
393 write_unlock(&obd_dev_lock);
394 obd_device_free(obd);
396 class_put_type(obd_type);
399 int class_name2dev(const char *name)
406 read_lock(&obd_dev_lock);
407 for (i = 0; i < class_devno_max(); i++) {
408 struct obd_device *obd = class_num2obd(i);
410 if (obd && strcmp(name, obd->obd_name) == 0) {
411 /* Make sure we finished attaching before we give
412 out any references */
413 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
414 if (obd->obd_attached) {
415 read_unlock(&obd_dev_lock);
421 read_unlock(&obd_dev_lock);
426 struct obd_device *class_name2obd(const char *name)
428 int dev = class_name2dev(name);
430 if (dev < 0 || dev > class_devno_max())
432 return class_num2obd(dev);
434 EXPORT_SYMBOL(class_name2obd);
436 int class_uuid2dev(struct obd_uuid *uuid)
440 read_lock(&obd_dev_lock);
441 for (i = 0; i < class_devno_max(); i++) {
442 struct obd_device *obd = class_num2obd(i);
444 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
445 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
446 read_unlock(&obd_dev_lock);
450 read_unlock(&obd_dev_lock);
455 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
457 int dev = class_uuid2dev(uuid);
460 return class_num2obd(dev);
462 EXPORT_SYMBOL(class_uuid2obd);
465 * Get obd device from ::obd_devs[]
467 * \param num [in] array index
469 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
470 * otherwise return the obd device there.
472 struct obd_device *class_num2obd(int num)
474 struct obd_device *obd = NULL;
476 if (num < class_devno_max()) {
481 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
482 "%p obd_magic %08x != %08x\n",
483 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
484 LASSERTF(obd->obd_minor == num,
485 "%p obd_minor %0d != %0d\n",
486 obd, obd->obd_minor, num);
493 * Get obd devices count. Device in any
495 * \retval obd device count
497 int get_devices_count(void)
499 int index, max_index = class_devno_max(), dev_count = 0;
501 read_lock(&obd_dev_lock);
502 for (index = 0; index <= max_index; index++) {
503 struct obd_device *obd = class_num2obd(index);
507 read_unlock(&obd_dev_lock);
511 EXPORT_SYMBOL(get_devices_count);
513 void class_obd_list(void)
518 read_lock(&obd_dev_lock);
519 for (i = 0; i < class_devno_max(); i++) {
520 struct obd_device *obd = class_num2obd(i);
524 if (obd->obd_stopping)
526 else if (obd->obd_set_up)
528 else if (obd->obd_attached)
532 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
533 i, status, obd->obd_type->typ_name,
534 obd->obd_name, obd->obd_uuid.uuid,
535 atomic_read(&obd->obd_refcount));
537 read_unlock(&obd_dev_lock);
541 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
542 specified, then only the client with that uuid is returned,
543 otherwise any client connected to the tgt is returned. */
544 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
545 const char * typ_name,
546 struct obd_uuid *grp_uuid)
550 read_lock(&obd_dev_lock);
551 for (i = 0; i < class_devno_max(); i++) {
552 struct obd_device *obd = class_num2obd(i);
556 if ((strncmp(obd->obd_type->typ_name, typ_name,
557 strlen(typ_name)) == 0)) {
558 if (obd_uuid_equals(tgt_uuid,
559 &obd->u.cli.cl_target_uuid) &&
560 ((grp_uuid)? obd_uuid_equals(grp_uuid,
561 &obd->obd_uuid) : 1)) {
562 read_unlock(&obd_dev_lock);
567 read_unlock(&obd_dev_lock);
571 EXPORT_SYMBOL(class_find_client_obd);
573 /* Iterate the obd_device list looking devices have grp_uuid. Start
574 searching at *next, and if a device is found, the next index to look
575 at is saved in *next. If next is NULL, then the first matching device
576 will always be returned. */
577 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
583 else if (*next >= 0 && *next < class_devno_max())
588 read_lock(&obd_dev_lock);
589 for (; i < class_devno_max(); i++) {
590 struct obd_device *obd = class_num2obd(i);
594 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
597 read_unlock(&obd_dev_lock);
601 read_unlock(&obd_dev_lock);
605 EXPORT_SYMBOL(class_devices_in_group);
608 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
609 * adjust sptlrpc settings accordingly.
611 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
613 struct obd_device *obd;
617 LASSERT(namelen > 0);
619 read_lock(&obd_dev_lock);
620 for (i = 0; i < class_devno_max(); i++) {
621 obd = class_num2obd(i);
623 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
626 /* only notify mdc, osc, osp, lwp, mdt, ost
627 * because only these have a -sptlrpc llog */
628 type = obd->obd_type->typ_name;
629 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
630 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
631 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
632 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
633 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
634 strcmp(type, LUSTRE_OST_NAME) != 0)
637 if (strncmp(obd->obd_name, fsname, namelen))
640 class_incref(obd, __FUNCTION__, obd);
641 read_unlock(&obd_dev_lock);
642 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
643 sizeof(KEY_SPTLRPC_CONF),
644 KEY_SPTLRPC_CONF, 0, NULL, NULL);
646 class_decref(obd, __FUNCTION__, obd);
647 read_lock(&obd_dev_lock);
649 read_unlock(&obd_dev_lock);
652 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
654 void obd_cleanup_caches(void)
657 if (obd_device_cachep) {
658 kmem_cache_destroy(obd_device_cachep);
659 obd_device_cachep = NULL;
662 kmem_cache_destroy(obdo_cachep);
666 kmem_cache_destroy(import_cachep);
667 import_cachep = NULL;
673 int obd_init_caches(void)
678 LASSERT(obd_device_cachep == NULL);
679 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
680 sizeof(struct obd_device),
682 if (!obd_device_cachep)
683 GOTO(out, rc = -ENOMEM);
685 LASSERT(obdo_cachep == NULL);
686 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
689 GOTO(out, rc = -ENOMEM);
691 LASSERT(import_cachep == NULL);
692 import_cachep = kmem_cache_create("ll_import_cache",
693 sizeof(struct obd_import),
696 GOTO(out, rc = -ENOMEM);
700 obd_cleanup_caches();
704 /* map connection to client */
705 struct obd_export *class_conn2export(struct lustre_handle *conn)
707 struct obd_export *export;
711 CDEBUG(D_CACHE, "looking for null handle\n");
715 if (conn->cookie == -1) { /* this means assign a new connection */
716 CDEBUG(D_CACHE, "want a new connection\n");
720 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
721 export = class_handle2object(conn->cookie, NULL);
724 EXPORT_SYMBOL(class_conn2export);
726 struct obd_device *class_exp2obd(struct obd_export *exp)
732 EXPORT_SYMBOL(class_exp2obd);
734 struct obd_device *class_conn2obd(struct lustre_handle *conn)
736 struct obd_export *export;
737 export = class_conn2export(conn);
739 struct obd_device *obd = export->exp_obd;
740 class_export_put(export);
746 struct obd_import *class_exp2cliimp(struct obd_export *exp)
748 struct obd_device *obd = exp->exp_obd;
751 return obd->u.cli.cl_import;
753 EXPORT_SYMBOL(class_exp2cliimp);
755 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
757 struct obd_device *obd = class_conn2obd(conn);
760 return obd->u.cli.cl_import;
763 /* Export management functions */
764 static void class_export_destroy(struct obd_export *exp)
766 struct obd_device *obd = exp->exp_obd;
769 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
770 LASSERT(obd != NULL);
772 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
773 exp->exp_client_uuid.uuid, obd->obd_name);
775 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
776 if (exp->exp_connection)
777 ptlrpc_put_connection_superhack(exp->exp_connection);
779 LASSERT(list_empty(&exp->exp_outstanding_replies));
780 LASSERT(list_empty(&exp->exp_uncommitted_replies));
781 LASSERT(list_empty(&exp->exp_req_replay_queue));
782 LASSERT(list_empty(&exp->exp_hp_rpcs));
783 obd_destroy_export(exp);
784 class_decref(obd, "export", exp);
786 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
790 static void export_handle_addref(void *export)
792 class_export_get(export);
795 static struct portals_handle_ops export_handle_ops = {
796 .hop_addref = export_handle_addref,
800 struct obd_export *class_export_get(struct obd_export *exp)
802 atomic_inc(&exp->exp_refcount);
803 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
804 atomic_read(&exp->exp_refcount));
807 EXPORT_SYMBOL(class_export_get);
809 void class_export_put(struct obd_export *exp)
811 LASSERT(exp != NULL);
812 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
813 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
814 atomic_read(&exp->exp_refcount) - 1);
816 if (atomic_dec_and_test(&exp->exp_refcount)) {
817 LASSERT(!list_empty(&exp->exp_obd_chain));
818 LASSERT(list_empty(&exp->exp_stale_list));
819 CDEBUG(D_IOCTL, "final put %p/%s\n",
820 exp, exp->exp_client_uuid.uuid);
822 /* release nid stat refererence */
823 lprocfs_exp_cleanup(exp);
825 obd_zombie_export_add(exp);
828 EXPORT_SYMBOL(class_export_put);
830 /* Creates a new export, adds it to the hash table, and returns a
831 * pointer to it. The refcount is 2: one for the hash reference, and
832 * one for the pointer returned by this function. */
833 struct obd_export *class_new_export(struct obd_device *obd,
834 struct obd_uuid *cluuid)
836 struct obd_export *export;
837 struct cfs_hash *hash = NULL;
841 OBD_ALLOC_PTR(export);
843 return ERR_PTR(-ENOMEM);
845 export->exp_conn_cnt = 0;
846 export->exp_lock_hash = NULL;
847 export->exp_flock_hash = NULL;
848 atomic_set(&export->exp_refcount, 2);
849 atomic_set(&export->exp_rpc_count, 0);
850 atomic_set(&export->exp_cb_count, 0);
851 atomic_set(&export->exp_locks_count, 0);
852 #if LUSTRE_TRACKS_LOCK_EXP_REFS
853 INIT_LIST_HEAD(&export->exp_locks_list);
854 spin_lock_init(&export->exp_locks_list_guard);
856 atomic_set(&export->exp_replay_count, 0);
857 export->exp_obd = obd;
858 INIT_LIST_HEAD(&export->exp_outstanding_replies);
859 spin_lock_init(&export->exp_uncommitted_replies_lock);
860 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
861 INIT_LIST_HEAD(&export->exp_req_replay_queue);
862 INIT_LIST_HEAD(&export->exp_handle.h_link);
863 INIT_LIST_HEAD(&export->exp_hp_rpcs);
864 INIT_LIST_HEAD(&export->exp_reg_rpcs);
865 class_handle_hash(&export->exp_handle, &export_handle_ops);
866 export->exp_last_request_time = cfs_time_current_sec();
867 spin_lock_init(&export->exp_lock);
868 spin_lock_init(&export->exp_rpc_lock);
869 INIT_HLIST_NODE(&export->exp_uuid_hash);
870 INIT_HLIST_NODE(&export->exp_nid_hash);
871 INIT_HLIST_NODE(&export->exp_gen_hash);
872 spin_lock_init(&export->exp_bl_list_lock);
873 INIT_LIST_HEAD(&export->exp_bl_list);
874 INIT_LIST_HEAD(&export->exp_stale_list);
876 export->exp_sp_peer = LUSTRE_SP_ANY;
877 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
878 export->exp_client_uuid = *cluuid;
879 obd_init_export(export);
881 spin_lock(&obd->obd_dev_lock);
882 /* shouldn't happen, but might race */
883 if (obd->obd_stopping)
884 GOTO(exit_unlock, rc = -ENODEV);
886 hash = cfs_hash_getref(obd->obd_uuid_hash);
888 GOTO(exit_unlock, rc = -ENODEV);
889 spin_unlock(&obd->obd_dev_lock);
891 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
892 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
894 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
895 obd->obd_name, cluuid->uuid, rc);
896 GOTO(exit_err, rc = -EALREADY);
900 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
901 spin_lock(&obd->obd_dev_lock);
902 if (obd->obd_stopping) {
903 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
904 GOTO(exit_unlock, rc = -ENODEV);
907 class_incref(obd, "export", export);
908 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
909 list_add_tail(&export->exp_obd_chain_timed,
910 &export->exp_obd->obd_exports_timed);
911 export->exp_obd->obd_num_exports++;
912 spin_unlock(&obd->obd_dev_lock);
913 cfs_hash_putref(hash);
917 spin_unlock(&obd->obd_dev_lock);
920 cfs_hash_putref(hash);
921 class_handle_unhash(&export->exp_handle);
922 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
923 obd_destroy_export(export);
924 OBD_FREE_PTR(export);
927 EXPORT_SYMBOL(class_new_export);
929 void class_unlink_export(struct obd_export *exp)
931 class_handle_unhash(&exp->exp_handle);
933 spin_lock(&exp->exp_obd->obd_dev_lock);
934 /* delete an uuid-export hashitem from hashtables */
935 if (!hlist_unhashed(&exp->exp_uuid_hash))
936 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
937 &exp->exp_client_uuid,
938 &exp->exp_uuid_hash);
940 if (!hlist_unhashed(&exp->exp_gen_hash)) {
941 struct tg_export_data *ted = &exp->exp_target_data;
942 struct cfs_hash *hash;
944 /* Because obd_gen_hash will not be released until
945 * class_cleanup(), so hash should never be NULL here */
946 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
947 LASSERT(hash != NULL);
948 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
950 cfs_hash_putref(hash);
953 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
954 list_del_init(&exp->exp_obd_chain_timed);
955 exp->exp_obd->obd_num_exports--;
956 spin_unlock(&exp->exp_obd->obd_dev_lock);
957 atomic_inc(&obd_stale_export_num);
959 /* A reference is kept by obd_stale_exports list */
960 obd_stale_export_put(exp);
962 EXPORT_SYMBOL(class_unlink_export);
964 /* Import management functions */
965 static void class_import_destroy(struct obd_import *imp)
969 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
970 imp->imp_obd->obd_name);
972 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
974 ptlrpc_put_connection_superhack(imp->imp_connection);
976 while (!list_empty(&imp->imp_conn_list)) {
977 struct obd_import_conn *imp_conn;
979 imp_conn = list_entry(imp->imp_conn_list.next,
980 struct obd_import_conn, oic_item);
981 list_del_init(&imp_conn->oic_item);
982 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
983 OBD_FREE(imp_conn, sizeof(*imp_conn));
986 LASSERT(imp->imp_sec == NULL);
987 class_decref(imp->imp_obd, "import", imp);
988 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
992 static void import_handle_addref(void *import)
994 class_import_get(import);
997 static struct portals_handle_ops import_handle_ops = {
998 .hop_addref = import_handle_addref,
1002 struct obd_import *class_import_get(struct obd_import *import)
1004 atomic_inc(&import->imp_refcount);
1005 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1006 atomic_read(&import->imp_refcount),
1007 import->imp_obd->obd_name);
1010 EXPORT_SYMBOL(class_import_get);
1012 void class_import_put(struct obd_import *imp)
1016 LASSERT(list_empty(&imp->imp_zombie_chain));
1017 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1019 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1020 atomic_read(&imp->imp_refcount) - 1,
1021 imp->imp_obd->obd_name);
1023 if (atomic_dec_and_test(&imp->imp_refcount)) {
1024 CDEBUG(D_INFO, "final put import %p\n", imp);
1025 obd_zombie_import_add(imp);
1028 /* catch possible import put race */
1029 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1032 EXPORT_SYMBOL(class_import_put);
1034 static void init_imp_at(struct imp_at *at) {
1036 at_init(&at->iat_net_latency, 0, 0);
1037 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1038 /* max service estimates are tracked on the server side, so
1039 don't use the AT history here, just use the last reported
1040 val. (But keep hist for proc histogram, worst_ever) */
1041 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1046 struct obd_import *class_new_import(struct obd_device *obd)
1048 struct obd_import *imp;
1049 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1051 OBD_ALLOC(imp, sizeof(*imp));
1055 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1056 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1057 INIT_LIST_HEAD(&imp->imp_replay_list);
1058 INIT_LIST_HEAD(&imp->imp_sending_list);
1059 INIT_LIST_HEAD(&imp->imp_delayed_list);
1060 INIT_LIST_HEAD(&imp->imp_committed_list);
1061 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1062 imp->imp_known_replied_xid = 0;
1063 imp->imp_replay_cursor = &imp->imp_committed_list;
1064 spin_lock_init(&imp->imp_lock);
1065 imp->imp_last_success_conn = 0;
1066 imp->imp_state = LUSTRE_IMP_NEW;
1067 imp->imp_obd = class_incref(obd, "import", imp);
1068 mutex_init(&imp->imp_sec_mutex);
1069 init_waitqueue_head(&imp->imp_recovery_waitq);
1071 if (curr_pid_ns->child_reaper)
1072 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1074 imp->imp_sec_refpid = 1;
1076 atomic_set(&imp->imp_refcount, 2);
1077 atomic_set(&imp->imp_unregistering, 0);
1078 atomic_set(&imp->imp_inflight, 0);
1079 atomic_set(&imp->imp_replay_inflight, 0);
1080 atomic_set(&imp->imp_inval_count, 0);
1081 INIT_LIST_HEAD(&imp->imp_conn_list);
1082 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1083 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1084 init_imp_at(&imp->imp_at);
1086 /* the default magic is V2, will be used in connect RPC, and
1087 * then adjusted according to the flags in request/reply. */
1088 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1092 EXPORT_SYMBOL(class_new_import);
1094 void class_destroy_import(struct obd_import *import)
1096 LASSERT(import != NULL);
1097 LASSERT(import != LP_POISON);
1099 class_handle_unhash(&import->imp_handle);
1101 spin_lock(&import->imp_lock);
1102 import->imp_generation++;
1103 spin_unlock(&import->imp_lock);
1104 class_import_put(import);
1106 EXPORT_SYMBOL(class_destroy_import);
1108 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1110 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1112 spin_lock(&exp->exp_locks_list_guard);
1114 LASSERT(lock->l_exp_refs_nr >= 0);
1116 if (lock->l_exp_refs_target != NULL &&
1117 lock->l_exp_refs_target != exp) {
1118 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1119 exp, lock, lock->l_exp_refs_target);
1121 if ((lock->l_exp_refs_nr ++) == 0) {
1122 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1123 lock->l_exp_refs_target = exp;
1125 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1126 lock, exp, lock->l_exp_refs_nr);
1127 spin_unlock(&exp->exp_locks_list_guard);
1129 EXPORT_SYMBOL(__class_export_add_lock_ref);
1131 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1133 spin_lock(&exp->exp_locks_list_guard);
1134 LASSERT(lock->l_exp_refs_nr > 0);
1135 if (lock->l_exp_refs_target != exp) {
1136 LCONSOLE_WARN("lock %p, "
1137 "mismatching export pointers: %p, %p\n",
1138 lock, lock->l_exp_refs_target, exp);
1140 if (-- lock->l_exp_refs_nr == 0) {
1141 list_del_init(&lock->l_exp_refs_link);
1142 lock->l_exp_refs_target = NULL;
1144 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1145 lock, exp, lock->l_exp_refs_nr);
1146 spin_unlock(&exp->exp_locks_list_guard);
1148 EXPORT_SYMBOL(__class_export_del_lock_ref);
1151 /* A connection defines an export context in which preallocation can
1152 be managed. This releases the export pointer reference, and returns
1153 the export handle, so the export refcount is 1 when this function
1155 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1156 struct obd_uuid *cluuid)
1158 struct obd_export *export;
1159 LASSERT(conn != NULL);
1160 LASSERT(obd != NULL);
1161 LASSERT(cluuid != NULL);
1164 export = class_new_export(obd, cluuid);
1166 RETURN(PTR_ERR(export));
1168 conn->cookie = export->exp_handle.h_cookie;
1169 class_export_put(export);
1171 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1172 cluuid->uuid, conn->cookie);
1175 EXPORT_SYMBOL(class_connect);
1177 /* if export is involved in recovery then clean up related things */
1178 static void class_export_recovery_cleanup(struct obd_export *exp)
1180 struct obd_device *obd = exp->exp_obd;
1182 spin_lock(&obd->obd_recovery_task_lock);
1183 if (obd->obd_recovering) {
1184 if (exp->exp_in_recovery) {
1185 spin_lock(&exp->exp_lock);
1186 exp->exp_in_recovery = 0;
1187 spin_unlock(&exp->exp_lock);
1188 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1189 atomic_dec(&obd->obd_connected_clients);
1192 /* if called during recovery then should update
1193 * obd_stale_clients counter,
1194 * lightweight exports are not counted */
1195 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1196 exp->exp_obd->obd_stale_clients++;
1198 spin_unlock(&obd->obd_recovery_task_lock);
1200 spin_lock(&exp->exp_lock);
1201 /** Cleanup req replay fields */
1202 if (exp->exp_req_replay_needed) {
1203 exp->exp_req_replay_needed = 0;
1205 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1206 atomic_dec(&obd->obd_req_replay_clients);
1209 /** Cleanup lock replay data */
1210 if (exp->exp_lock_replay_needed) {
1211 exp->exp_lock_replay_needed = 0;
1213 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1214 atomic_dec(&obd->obd_lock_replay_clients);
1216 spin_unlock(&exp->exp_lock);
1219 /* This function removes 1-3 references from the export:
1220 * 1 - for export pointer passed
1221 * and if disconnect really need
1222 * 2 - removing from hash
1223 * 3 - in client_unlink_export
1224 * The export pointer passed to this function can destroyed */
1225 int class_disconnect(struct obd_export *export)
1227 int already_disconnected;
1230 if (export == NULL) {
1231 CWARN("attempting to free NULL export %p\n", export);
1235 spin_lock(&export->exp_lock);
1236 already_disconnected = export->exp_disconnected;
1237 export->exp_disconnected = 1;
1238 spin_unlock(&export->exp_lock);
1240 /* class_cleanup(), abort_recovery(), and class_fail_export()
1241 * all end up in here, and if any of them race we shouldn't
1242 * call extra class_export_puts(). */
1243 if (already_disconnected) {
1244 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1245 GOTO(no_disconn, already_disconnected);
1248 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1249 export->exp_handle.h_cookie);
1251 if (!hlist_unhashed(&export->exp_nid_hash))
1252 cfs_hash_del(export->exp_obd->obd_nid_hash,
1253 &export->exp_connection->c_peer.nid,
1254 &export->exp_nid_hash);
1256 class_export_recovery_cleanup(export);
1257 class_unlink_export(export);
1259 class_export_put(export);
1262 EXPORT_SYMBOL(class_disconnect);
1264 /* Return non-zero for a fully connected export */
1265 int class_connected_export(struct obd_export *exp)
1270 spin_lock(&exp->exp_lock);
1271 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1272 spin_unlock(&exp->exp_lock);
1276 EXPORT_SYMBOL(class_connected_export);
1278 static void class_disconnect_export_list(struct list_head *list,
1279 enum obd_option flags)
1282 struct obd_export *exp;
1285 /* It's possible that an export may disconnect itself, but
1286 * nothing else will be added to this list. */
1287 while (!list_empty(list)) {
1288 exp = list_entry(list->next, struct obd_export,
1290 /* need for safe call CDEBUG after obd_disconnect */
1291 class_export_get(exp);
1293 spin_lock(&exp->exp_lock);
1294 exp->exp_flags = flags;
1295 spin_unlock(&exp->exp_lock);
1297 if (obd_uuid_equals(&exp->exp_client_uuid,
1298 &exp->exp_obd->obd_uuid)) {
1300 "exp %p export uuid == obd uuid, don't discon\n",
1302 /* Need to delete this now so we don't end up pointing
1303 * to work_list later when this export is cleaned up. */
1304 list_del_init(&exp->exp_obd_chain);
1305 class_export_put(exp);
1309 class_export_get(exp);
1310 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1311 "last request at "CFS_TIME_T"\n",
1312 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1313 exp, exp->exp_last_request_time);
1314 /* release one export reference anyway */
1315 rc = obd_disconnect(exp);
1317 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1318 obd_export_nid2str(exp), exp, rc);
1319 class_export_put(exp);
1324 void class_disconnect_exports(struct obd_device *obd)
1326 struct list_head work_list;
1329 /* Move all of the exports from obd_exports to a work list, en masse. */
1330 INIT_LIST_HEAD(&work_list);
1331 spin_lock(&obd->obd_dev_lock);
1332 list_splice_init(&obd->obd_exports, &work_list);
1333 list_splice_init(&obd->obd_delayed_exports, &work_list);
1334 spin_unlock(&obd->obd_dev_lock);
1336 if (!list_empty(&work_list)) {
1337 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1338 "disconnecting them\n", obd->obd_minor, obd);
1339 class_disconnect_export_list(&work_list,
1340 exp_flags_from_obd(obd));
1342 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1343 obd->obd_minor, obd);
1346 EXPORT_SYMBOL(class_disconnect_exports);
1348 /* Remove exports that have not completed recovery.
1350 void class_disconnect_stale_exports(struct obd_device *obd,
1351 int (*test_export)(struct obd_export *))
1353 struct list_head work_list;
1354 struct obd_export *exp, *n;
1358 INIT_LIST_HEAD(&work_list);
1359 spin_lock(&obd->obd_dev_lock);
1360 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1362 /* don't count self-export as client */
1363 if (obd_uuid_equals(&exp->exp_client_uuid,
1364 &exp->exp_obd->obd_uuid))
1367 /* don't evict clients which have no slot in last_rcvd
1368 * (e.g. lightweight connection) */
1369 if (exp->exp_target_data.ted_lr_idx == -1)
1372 spin_lock(&exp->exp_lock);
1373 if (exp->exp_failed || test_export(exp)) {
1374 spin_unlock(&exp->exp_lock);
1377 exp->exp_failed = 1;
1378 spin_unlock(&exp->exp_lock);
1380 list_move(&exp->exp_obd_chain, &work_list);
1382 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1383 obd->obd_name, exp->exp_client_uuid.uuid,
1384 exp->exp_connection == NULL ? "<unknown>" :
1385 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1386 print_export_data(exp, "EVICTING", 0, D_HA);
1388 spin_unlock(&obd->obd_dev_lock);
1391 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1392 obd->obd_name, evicted);
1394 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1395 OBD_OPT_ABORT_RECOV);
1398 EXPORT_SYMBOL(class_disconnect_stale_exports);
1400 void class_fail_export(struct obd_export *exp)
1402 int rc, already_failed;
1404 spin_lock(&exp->exp_lock);
1405 already_failed = exp->exp_failed;
1406 exp->exp_failed = 1;
1407 spin_unlock(&exp->exp_lock);
1409 if (already_failed) {
1410 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1411 exp, exp->exp_client_uuid.uuid);
1415 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1416 exp, exp->exp_client_uuid.uuid);
1418 if (obd_dump_on_timeout)
1419 libcfs_debug_dumplog();
1421 /* need for safe call CDEBUG after obd_disconnect */
1422 class_export_get(exp);
1424 /* Most callers into obd_disconnect are removing their own reference
1425 * (request, for example) in addition to the one from the hash table.
1426 * We don't have such a reference here, so make one. */
1427 class_export_get(exp);
1428 rc = obd_disconnect(exp);
1430 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1432 CDEBUG(D_HA, "disconnected export %p/%s\n",
1433 exp, exp->exp_client_uuid.uuid);
1434 class_export_put(exp);
1436 EXPORT_SYMBOL(class_fail_export);
1438 char *obd_export_nid2str(struct obd_export *exp)
1440 if (exp->exp_connection != NULL)
1441 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1445 EXPORT_SYMBOL(obd_export_nid2str);
1447 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1449 struct cfs_hash *nid_hash;
1450 struct obd_export *doomed_exp = NULL;
1451 int exports_evicted = 0;
1453 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1455 spin_lock(&obd->obd_dev_lock);
1456 /* umount has run already, so evict thread should leave
1457 * its task to umount thread now */
1458 if (obd->obd_stopping) {
1459 spin_unlock(&obd->obd_dev_lock);
1460 return exports_evicted;
1462 nid_hash = obd->obd_nid_hash;
1463 cfs_hash_getref(nid_hash);
1464 spin_unlock(&obd->obd_dev_lock);
1467 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1468 if (doomed_exp == NULL)
1471 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1472 "nid %s found, wanted nid %s, requested nid %s\n",
1473 obd_export_nid2str(doomed_exp),
1474 libcfs_nid2str(nid_key), nid);
1475 LASSERTF(doomed_exp != obd->obd_self_export,
1476 "self-export is hashed by NID?\n");
1478 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1479 "request\n", obd->obd_name,
1480 obd_uuid2str(&doomed_exp->exp_client_uuid),
1481 obd_export_nid2str(doomed_exp));
1482 class_fail_export(doomed_exp);
1483 class_export_put(doomed_exp);
1486 cfs_hash_putref(nid_hash);
1488 if (!exports_evicted)
1489 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1490 obd->obd_name, nid);
1491 return exports_evicted;
1493 EXPORT_SYMBOL(obd_export_evict_by_nid);
1495 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1497 struct cfs_hash *uuid_hash;
1498 struct obd_export *doomed_exp = NULL;
1499 struct obd_uuid doomed_uuid;
1500 int exports_evicted = 0;
1502 spin_lock(&obd->obd_dev_lock);
1503 if (obd->obd_stopping) {
1504 spin_unlock(&obd->obd_dev_lock);
1505 return exports_evicted;
1507 uuid_hash = obd->obd_uuid_hash;
1508 cfs_hash_getref(uuid_hash);
1509 spin_unlock(&obd->obd_dev_lock);
1511 obd_str2uuid(&doomed_uuid, uuid);
1512 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1513 CERROR("%s: can't evict myself\n", obd->obd_name);
1514 cfs_hash_putref(uuid_hash);
1515 return exports_evicted;
1518 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1520 if (doomed_exp == NULL) {
1521 CERROR("%s: can't disconnect %s: no exports found\n",
1522 obd->obd_name, uuid);
1524 CWARN("%s: evicting %s at adminstrative request\n",
1525 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1526 class_fail_export(doomed_exp);
1527 class_export_put(doomed_exp);
1530 cfs_hash_putref(uuid_hash);
1532 return exports_evicted;
1535 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1536 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1537 EXPORT_SYMBOL(class_export_dump_hook);
1540 static void print_export_data(struct obd_export *exp, const char *status,
1541 int locks, int debug_level)
1543 struct ptlrpc_reply_state *rs;
1544 struct ptlrpc_reply_state *first_reply = NULL;
1547 spin_lock(&exp->exp_lock);
1548 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1554 spin_unlock(&exp->exp_lock);
1556 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1557 "%p %s %llu stale:%d\n",
1558 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1559 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1560 atomic_read(&exp->exp_rpc_count),
1561 atomic_read(&exp->exp_cb_count),
1562 atomic_read(&exp->exp_locks_count),
1563 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1564 nreplies, first_reply, nreplies > 3 ? "..." : "",
1565 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1566 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1567 if (locks && class_export_dump_hook != NULL)
1568 class_export_dump_hook(exp);
1572 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1574 struct obd_export *exp;
1576 spin_lock(&obd->obd_dev_lock);
1577 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1578 print_export_data(exp, "ACTIVE", locks, debug_level);
1579 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1580 print_export_data(exp, "UNLINKED", locks, debug_level);
1581 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1582 print_export_data(exp, "DELAYED", locks, debug_level);
1583 spin_unlock(&obd->obd_dev_lock);
1584 spin_lock(&obd_zombie_impexp_lock);
1585 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1586 print_export_data(exp, "ZOMBIE", locks, debug_level);
1587 spin_unlock(&obd_zombie_impexp_lock);
1590 void obd_exports_barrier(struct obd_device *obd)
1593 LASSERT(list_empty(&obd->obd_exports));
1594 spin_lock(&obd->obd_dev_lock);
1595 while (!list_empty(&obd->obd_unlinked_exports)) {
1596 spin_unlock(&obd->obd_dev_lock);
1597 set_current_state(TASK_UNINTERRUPTIBLE);
1598 schedule_timeout(cfs_time_seconds(waited));
1599 if (waited > 5 && IS_PO2(waited)) {
1600 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1601 "more than %d seconds. "
1602 "The obd refcount = %d. Is it stuck?\n",
1603 obd->obd_name, waited,
1604 atomic_read(&obd->obd_refcount));
1605 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1608 spin_lock(&obd->obd_dev_lock);
1610 spin_unlock(&obd->obd_dev_lock);
1612 EXPORT_SYMBOL(obd_exports_barrier);
1614 /* Total amount of zombies to be destroyed */
1615 static int zombies_count = 0;
1618 * kill zombie imports and exports
1620 void obd_zombie_impexp_cull(void)
1622 struct obd_import *import;
1623 struct obd_export *export;
1627 spin_lock(&obd_zombie_impexp_lock);
1630 if (!list_empty(&obd_zombie_imports)) {
1631 import = list_entry(obd_zombie_imports.next,
1634 list_del_init(&import->imp_zombie_chain);
1638 if (!list_empty(&obd_zombie_exports)) {
1639 export = list_entry(obd_zombie_exports.next,
1642 list_del_init(&export->exp_obd_chain);
1645 spin_unlock(&obd_zombie_impexp_lock);
1647 if (import != NULL) {
1648 class_import_destroy(import);
1649 spin_lock(&obd_zombie_impexp_lock);
1651 spin_unlock(&obd_zombie_impexp_lock);
1654 if (export != NULL) {
1655 class_export_destroy(export);
1656 spin_lock(&obd_zombie_impexp_lock);
1658 spin_unlock(&obd_zombie_impexp_lock);
1662 } while (import != NULL || export != NULL);
1666 static struct completion obd_zombie_start;
1667 static struct completion obd_zombie_stop;
1668 static unsigned long obd_zombie_flags;
1669 static wait_queue_head_t obd_zombie_waitq;
1670 static pid_t obd_zombie_pid;
1673 OBD_ZOMBIE_STOP = 0x0001,
1677 * check for work for kill zombie import/export thread.
1679 static int obd_zombie_impexp_check(void *arg)
1683 spin_lock(&obd_zombie_impexp_lock);
1684 rc = (zombies_count == 0) &&
1685 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1686 spin_unlock(&obd_zombie_impexp_lock);
1692 * Add export to the obd_zombe thread and notify it.
1694 static void obd_zombie_export_add(struct obd_export *exp) {
1695 atomic_dec(&obd_stale_export_num);
1696 spin_lock(&exp->exp_obd->obd_dev_lock);
1697 LASSERT(!list_empty(&exp->exp_obd_chain));
1698 list_del_init(&exp->exp_obd_chain);
1699 spin_unlock(&exp->exp_obd->obd_dev_lock);
1700 spin_lock(&obd_zombie_impexp_lock);
1702 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1703 spin_unlock(&obd_zombie_impexp_lock);
1705 obd_zombie_impexp_notify();
1709 * Add import to the obd_zombe thread and notify it.
1711 static void obd_zombie_import_add(struct obd_import *imp) {
1712 LASSERT(imp->imp_sec == NULL);
1713 spin_lock(&obd_zombie_impexp_lock);
1714 LASSERT(list_empty(&imp->imp_zombie_chain));
1716 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1717 spin_unlock(&obd_zombie_impexp_lock);
1719 obd_zombie_impexp_notify();
1723 * notify import/export destroy thread about new zombie.
1725 static void obd_zombie_impexp_notify(void)
1728 * Make sure obd_zomebie_impexp_thread get this notification.
1729 * It is possible this signal only get by obd_zombie_barrier, and
1730 * barrier gulps this notification and sleeps away and hangs ensues
1732 wake_up_all(&obd_zombie_waitq);
1736 * check whether obd_zombie is idle
1738 static int obd_zombie_is_idle(void)
1742 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1743 spin_lock(&obd_zombie_impexp_lock);
1744 rc = (zombies_count == 0);
1745 spin_unlock(&obd_zombie_impexp_lock);
1750 * wait when obd_zombie import/export queues become empty
1752 void obd_zombie_barrier(void)
1754 struct l_wait_info lwi = { 0 };
1756 if (obd_zombie_pid == current_pid())
1757 /* don't wait for myself */
1759 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1761 EXPORT_SYMBOL(obd_zombie_barrier);
1764 struct obd_export *obd_stale_export_get(void)
1766 struct obd_export *exp = NULL;
1769 spin_lock(&obd_stale_export_lock);
1770 if (!list_empty(&obd_stale_exports)) {
1771 exp = list_entry(obd_stale_exports.next,
1772 struct obd_export, exp_stale_list);
1773 list_del_init(&exp->exp_stale_list);
1775 spin_unlock(&obd_stale_export_lock);
1778 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1779 atomic_read(&obd_stale_export_num));
1783 EXPORT_SYMBOL(obd_stale_export_get);
1785 void obd_stale_export_put(struct obd_export *exp)
1789 LASSERT(list_empty(&exp->exp_stale_list));
1790 if (exp->exp_lock_hash &&
1791 atomic_read(&exp->exp_lock_hash->hs_count)) {
1792 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1793 atomic_read(&obd_stale_export_num));
1795 spin_lock_bh(&exp->exp_bl_list_lock);
1796 spin_lock(&obd_stale_export_lock);
1797 /* Add to the tail if there is no blocked locks,
1798 * to the head otherwise. */
1799 if (list_empty(&exp->exp_bl_list))
1800 list_add_tail(&exp->exp_stale_list,
1801 &obd_stale_exports);
1803 list_add(&exp->exp_stale_list,
1804 &obd_stale_exports);
1806 spin_unlock(&obd_stale_export_lock);
1807 spin_unlock_bh(&exp->exp_bl_list_lock);
1809 class_export_put(exp);
1813 EXPORT_SYMBOL(obd_stale_export_put);
1816 * Adjust the position of the export in the stale list,
1817 * i.e. move to the head of the list if is needed.
1819 void obd_stale_export_adjust(struct obd_export *exp)
1821 LASSERT(exp != NULL);
1822 spin_lock_bh(&exp->exp_bl_list_lock);
1823 spin_lock(&obd_stale_export_lock);
1825 if (!list_empty(&exp->exp_stale_list) &&
1826 !list_empty(&exp->exp_bl_list))
1827 list_move(&exp->exp_stale_list, &obd_stale_exports);
1829 spin_unlock(&obd_stale_export_lock);
1830 spin_unlock_bh(&exp->exp_bl_list_lock);
1832 EXPORT_SYMBOL(obd_stale_export_adjust);
1835 * destroy zombie export/import thread.
1837 static int obd_zombie_impexp_thread(void *unused)
1839 unshare_fs_struct();
1840 complete(&obd_zombie_start);
1842 obd_zombie_pid = current_pid();
1844 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1845 struct l_wait_info lwi = { 0 };
1847 l_wait_event(obd_zombie_waitq,
1848 !obd_zombie_impexp_check(NULL), &lwi);
1849 obd_zombie_impexp_cull();
1852 * Notify obd_zombie_barrier callers that queues
1855 wake_up(&obd_zombie_waitq);
1858 complete(&obd_zombie_stop);
1865 * start destroy zombie import/export thread
1867 int obd_zombie_impexp_init(void)
1869 struct task_struct *task;
1871 INIT_LIST_HEAD(&obd_zombie_imports);
1873 INIT_LIST_HEAD(&obd_zombie_exports);
1874 spin_lock_init(&obd_zombie_impexp_lock);
1875 init_completion(&obd_zombie_start);
1876 init_completion(&obd_zombie_stop);
1877 init_waitqueue_head(&obd_zombie_waitq);
1880 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1882 RETURN(PTR_ERR(task));
1884 wait_for_completion(&obd_zombie_start);
1888 * stop destroy zombie import/export thread
1890 void obd_zombie_impexp_stop(void)
1892 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1893 obd_zombie_impexp_notify();
1894 wait_for_completion(&obd_zombie_stop);
1897 /***** Kernel-userspace comm helpers *******/
1899 /* Get length of entire message, including header */
1900 int kuc_len(int payload_len)
1902 return sizeof(struct kuc_hdr) + payload_len;
1904 EXPORT_SYMBOL(kuc_len);
1906 /* Get a pointer to kuc header, given a ptr to the payload
1907 * @param p Pointer to payload area
1908 * @returns Pointer to kuc header
1910 struct kuc_hdr * kuc_ptr(void *p)
1912 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1913 LASSERT(lh->kuc_magic == KUC_MAGIC);
1916 EXPORT_SYMBOL(kuc_ptr);
1918 /* Alloc space for a message, and fill in header
1919 * @return Pointer to payload area
1921 void *kuc_alloc(int payload_len, int transport, int type)
1924 int len = kuc_len(payload_len);
1928 return ERR_PTR(-ENOMEM);
1930 lh->kuc_magic = KUC_MAGIC;
1931 lh->kuc_transport = transport;
1932 lh->kuc_msgtype = type;
1933 lh->kuc_msglen = len;
1935 return (void *)(lh + 1);
1937 EXPORT_SYMBOL(kuc_alloc);
1939 /* Takes pointer to payload area */
1940 inline void kuc_free(void *p, int payload_len)
1942 struct kuc_hdr *lh = kuc_ptr(p);
1943 OBD_FREE(lh, kuc_len(payload_len));
1945 EXPORT_SYMBOL(kuc_free);
1947 struct obd_request_slot_waiter {
1948 struct list_head orsw_entry;
1949 wait_queue_head_t orsw_waitq;
1953 static bool obd_request_slot_avail(struct client_obd *cli,
1954 struct obd_request_slot_waiter *orsw)
1958 spin_lock(&cli->cl_loi_list_lock);
1959 avail = !!list_empty(&orsw->orsw_entry);
1960 spin_unlock(&cli->cl_loi_list_lock);
1966 * For network flow control, the RPC sponsor needs to acquire a credit
1967 * before sending the RPC. The credits count for a connection is defined
1968 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1969 * the subsequent RPC sponsors need to wait until others released their
1970 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1972 int obd_get_request_slot(struct client_obd *cli)
1974 struct obd_request_slot_waiter orsw;
1975 struct l_wait_info lwi;
1978 spin_lock(&cli->cl_loi_list_lock);
1979 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1980 cli->cl_r_in_flight++;
1981 spin_unlock(&cli->cl_loi_list_lock);
1985 init_waitqueue_head(&orsw.orsw_waitq);
1986 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1987 orsw.orsw_signaled = false;
1988 spin_unlock(&cli->cl_loi_list_lock);
1990 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1991 rc = l_wait_event(orsw.orsw_waitq,
1992 obd_request_slot_avail(cli, &orsw) ||
1996 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1997 * freed but other (such as obd_put_request_slot) is using it. */
1998 spin_lock(&cli->cl_loi_list_lock);
2000 if (!orsw.orsw_signaled) {
2001 if (list_empty(&orsw.orsw_entry))
2002 cli->cl_r_in_flight--;
2004 list_del(&orsw.orsw_entry);
2008 if (orsw.orsw_signaled) {
2009 LASSERT(list_empty(&orsw.orsw_entry));
2013 spin_unlock(&cli->cl_loi_list_lock);
2017 EXPORT_SYMBOL(obd_get_request_slot);
2019 void obd_put_request_slot(struct client_obd *cli)
2021 struct obd_request_slot_waiter *orsw;
2023 spin_lock(&cli->cl_loi_list_lock);
2024 cli->cl_r_in_flight--;
2026 /* If there is free slot, wakeup the first waiter. */
2027 if (!list_empty(&cli->cl_loi_read_list) &&
2028 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2029 orsw = list_entry(cli->cl_loi_read_list.next,
2030 struct obd_request_slot_waiter, orsw_entry);
2031 list_del_init(&orsw->orsw_entry);
2032 cli->cl_r_in_flight++;
2033 wake_up(&orsw->orsw_waitq);
2035 spin_unlock(&cli->cl_loi_list_lock);
2037 EXPORT_SYMBOL(obd_put_request_slot);
2039 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2041 return cli->cl_max_rpcs_in_flight;
2043 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2045 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2047 struct obd_request_slot_waiter *orsw;
2054 if (max > OBD_MAX_RIF_MAX || max < 1)
2057 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2058 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2059 /* adjust max_mod_rpcs_in_flight to ensure it is always
2060 * strictly lower that max_rpcs_in_flight */
2062 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2063 "because it must be higher than "
2064 "max_mod_rpcs_in_flight value",
2065 cli->cl_import->imp_obd->obd_name);
2068 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2069 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2075 spin_lock(&cli->cl_loi_list_lock);
2076 old = cli->cl_max_rpcs_in_flight;
2077 cli->cl_max_rpcs_in_flight = max;
2080 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2081 for (i = 0; i < diff; i++) {
2082 if (list_empty(&cli->cl_loi_read_list))
2085 orsw = list_entry(cli->cl_loi_read_list.next,
2086 struct obd_request_slot_waiter, orsw_entry);
2087 list_del_init(&orsw->orsw_entry);
2088 cli->cl_r_in_flight++;
2089 wake_up(&orsw->orsw_waitq);
2091 spin_unlock(&cli->cl_loi_list_lock);
2095 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2097 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2099 return cli->cl_max_mod_rpcs_in_flight;
2101 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2103 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2105 struct obd_connect_data *ocd;
2109 if (max > OBD_MAX_RIF_MAX || max < 1)
2112 /* cannot exceed or equal max_rpcs_in_flight */
2113 if (max >= cli->cl_max_rpcs_in_flight) {
2114 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2115 "higher or equal to max_rpcs_in_flight value (%u)\n",
2116 cli->cl_import->imp_obd->obd_name,
2117 max, cli->cl_max_rpcs_in_flight);
2121 /* cannot exceed max modify RPCs in flight supported by the server */
2122 ocd = &cli->cl_import->imp_connect_data;
2123 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2124 maxmodrpcs = ocd->ocd_maxmodrpcs;
2127 if (max > maxmodrpcs) {
2128 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2129 "higher than max_mod_rpcs_per_client value (%hu) "
2130 "returned by the server at connection\n",
2131 cli->cl_import->imp_obd->obd_name,
2136 spin_lock(&cli->cl_mod_rpcs_lock);
2138 prev = cli->cl_max_mod_rpcs_in_flight;
2139 cli->cl_max_mod_rpcs_in_flight = max;
2141 /* wakeup waiters if limit has been increased */
2142 if (cli->cl_max_mod_rpcs_in_flight > prev)
2143 wake_up(&cli->cl_mod_rpcs_waitq);
2145 spin_unlock(&cli->cl_mod_rpcs_lock);
2149 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2152 #define pct(a, b) (b ? a * 100 / b : 0)
2153 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2154 struct seq_file *seq)
2157 unsigned long mod_tot = 0, mod_cum;
2160 do_gettimeofday(&now);
2162 spin_lock(&cli->cl_mod_rpcs_lock);
2164 seq_printf(seq, "snapshot_time: %lu.%lu (secs.usecs)\n",
2165 now.tv_sec, now.tv_usec);
2166 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2167 cli->cl_mod_rpcs_in_flight);
2169 seq_printf(seq, "\n\t\t\tmodify\n");
2170 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2172 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2175 for (i = 0; i < OBD_HIST_MAX; i++) {
2176 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2178 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2179 i, mod, pct(mod, mod_tot),
2180 pct(mod_cum, mod_tot));
2181 if (mod_cum == mod_tot)
2185 spin_unlock(&cli->cl_mod_rpcs_lock);
2189 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2193 /* The number of modify RPCs sent in parallel is limited
2194 * because the server has a finite number of slots per client to
2195 * store request result and ensure reply reconstruction when needed.
2196 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2197 * that takes into account server limit and cl_max_rpcs_in_flight
2199 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2200 * one close request is allowed above the maximum.
2202 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2207 /* A slot is available if
2208 * - number of modify RPCs in flight is less than the max
2209 * - it's a close RPC and no other close request is in flight
2211 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2212 (close_req && cli->cl_close_rpcs_in_flight == 0);
2217 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2222 spin_lock(&cli->cl_mod_rpcs_lock);
2223 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2224 spin_unlock(&cli->cl_mod_rpcs_lock);
2228 /* Get a modify RPC slot from the obd client @cli according
2229 * to the kind of operation @opc that is going to be sent
2230 * and the intent @it of the operation if it applies.
2231 * If the maximum number of modify RPCs in flight is reached
2232 * the thread is put to sleep.
2233 * Returns the tag to be set in the request message. Tag 0
2234 * is reserved for non-modifying requests.
2236 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2237 struct lookup_intent *it)
2239 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2240 bool close_req = false;
2243 /* read-only metadata RPCs don't consume a slot on MDT
2244 * for reply reconstruction
2246 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2247 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2250 if (opc == MDS_CLOSE)
2254 spin_lock(&cli->cl_mod_rpcs_lock);
2255 max = cli->cl_max_mod_rpcs_in_flight;
2256 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2257 /* there is a slot available */
2258 cli->cl_mod_rpcs_in_flight++;
2260 cli->cl_close_rpcs_in_flight++;
2261 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2262 cli->cl_mod_rpcs_in_flight);
2263 /* find a free tag */
2264 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2266 LASSERT(i < OBD_MAX_RIF_MAX);
2267 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2268 spin_unlock(&cli->cl_mod_rpcs_lock);
2269 /* tag 0 is reserved for non-modify RPCs */
2272 spin_unlock(&cli->cl_mod_rpcs_lock);
2274 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2275 "opc %u, max %hu\n",
2276 cli->cl_import->imp_obd->obd_name, opc, max);
2278 l_wait_event(cli->cl_mod_rpcs_waitq,
2279 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2282 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2284 /* Put a modify RPC slot from the obd client @cli according
2285 * to the kind of operation @opc that has been sent and the
2286 * intent @it of the operation if it applies.
2288 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2289 struct lookup_intent *it, __u16 tag)
2291 bool close_req = false;
2293 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2294 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2297 if (opc == MDS_CLOSE)
2300 spin_lock(&cli->cl_mod_rpcs_lock);
2301 cli->cl_mod_rpcs_in_flight--;
2303 cli->cl_close_rpcs_in_flight--;
2304 /* release the tag in the bitmap */
2305 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2306 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2307 spin_unlock(&cli->cl_mod_rpcs_lock);
2308 wake_up(&cli->cl_mod_rpcs_waitq);
2310 EXPORT_SYMBOL(obd_put_mod_rpc_slot);