4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lprocfs_status.h>
44 #include <lustre_disk.h>
45 #include <lustre_kernelcomm.h>
47 spinlock_t obd_types_lock;
49 static struct kmem_cache *obd_device_cachep;
50 struct kmem_cache *obdo_cachep;
51 EXPORT_SYMBOL(obdo_cachep);
52 static struct kmem_cache *import_cachep;
54 static struct list_head obd_zombie_imports;
55 static struct list_head obd_zombie_exports;
56 static spinlock_t obd_zombie_impexp_lock;
58 static void obd_zombie_impexp_notify(void);
59 static void obd_zombie_export_add(struct obd_export *exp);
60 static void obd_zombie_import_add(struct obd_import *imp);
61 static void print_export_data(struct obd_export *exp,
62 const char *status, int locks, int debug_level);
64 struct list_head obd_stale_exports;
65 spinlock_t obd_stale_export_lock;
66 atomic_t obd_stale_export_num;
68 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
69 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
72 * support functions: we could use inter-module communication, but this
73 * is more portable to other OS's
75 static struct obd_device *obd_device_alloc(void)
77 struct obd_device *obd;
79 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
81 obd->obd_magic = OBD_DEVICE_MAGIC;
86 static void obd_device_free(struct obd_device *obd)
89 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
90 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
91 if (obd->obd_namespace != NULL) {
92 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
93 obd, obd->obd_namespace, obd->obd_force);
96 lu_ref_fini(&obd->obd_reference);
97 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
100 struct obd_type *class_search_type(const char *name)
102 struct list_head *tmp;
103 struct obd_type *type;
105 spin_lock(&obd_types_lock);
106 list_for_each(tmp, &obd_types) {
107 type = list_entry(tmp, struct obd_type, typ_chain);
108 if (strcmp(type->typ_name, name) == 0) {
109 spin_unlock(&obd_types_lock);
113 spin_unlock(&obd_types_lock);
116 EXPORT_SYMBOL(class_search_type);
118 struct obd_type *class_get_type(const char *name)
120 struct obd_type *type = class_search_type(name);
122 #ifdef HAVE_MODULE_LOADING_SUPPORT
124 const char *modname = name;
126 if (strcmp(modname, "obdfilter") == 0)
129 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
130 modname = LUSTRE_OSP_NAME;
132 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
133 modname = LUSTRE_MDT_NAME;
135 if (!request_module("%s", modname)) {
136 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
137 type = class_search_type(name);
139 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
145 spin_lock(&type->obd_type_lock);
147 try_module_get(type->typ_dt_ops->o_owner);
148 spin_unlock(&type->obd_type_lock);
153 void class_put_type(struct obd_type *type)
156 spin_lock(&type->obd_type_lock);
158 module_put(type->typ_dt_ops->o_owner);
159 spin_unlock(&type->obd_type_lock);
162 #define CLASS_MAX_NAME 1024
164 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
165 bool enable_proc, struct lprocfs_vars *vars,
166 const char *name, struct lu_device_type *ldt)
168 struct obd_type *type;
173 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
175 if (class_search_type(name)) {
176 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
181 OBD_ALLOC(type, sizeof(*type));
185 OBD_ALLOC_PTR(type->typ_dt_ops);
186 OBD_ALLOC_PTR(type->typ_md_ops);
187 OBD_ALLOC(type->typ_name, strlen(name) + 1);
189 if (type->typ_dt_ops == NULL ||
190 type->typ_md_ops == NULL ||
191 type->typ_name == NULL)
194 *(type->typ_dt_ops) = *dt_ops;
195 /* md_ops is optional */
197 *(type->typ_md_ops) = *md_ops;
198 strcpy(type->typ_name, name);
199 spin_lock_init(&type->obd_type_lock);
201 #ifdef CONFIG_PROC_FS
203 type->typ_procroot = lprocfs_register(type->typ_name,
206 if (IS_ERR(type->typ_procroot)) {
207 rc = PTR_ERR(type->typ_procroot);
208 type->typ_procroot = NULL;
215 rc = lu_device_type_init(ldt);
220 spin_lock(&obd_types_lock);
221 list_add(&type->typ_chain, &obd_types);
222 spin_unlock(&obd_types_lock);
227 if (type->typ_name != NULL) {
228 #ifdef CONFIG_PROC_FS
229 if (type->typ_procroot != NULL)
230 remove_proc_subtree(type->typ_name, proc_lustre_root);
232 OBD_FREE(type->typ_name, strlen(name) + 1);
234 if (type->typ_md_ops != NULL)
235 OBD_FREE_PTR(type->typ_md_ops);
236 if (type->typ_dt_ops != NULL)
237 OBD_FREE_PTR(type->typ_dt_ops);
238 OBD_FREE(type, sizeof(*type));
241 EXPORT_SYMBOL(class_register_type);
243 int class_unregister_type(const char *name)
245 struct obd_type *type = class_search_type(name);
249 CERROR("unknown obd type\n");
253 if (type->typ_refcnt) {
254 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
255 /* This is a bad situation, let's make the best of it */
256 /* Remove ops, but leave the name for debugging */
257 OBD_FREE_PTR(type->typ_dt_ops);
258 OBD_FREE_PTR(type->typ_md_ops);
262 /* we do not use type->typ_procroot as for compatibility purposes
263 * other modules can share names (i.e. lod can use lov entry). so
264 * we can't reference pointer as it can get invalided when another
265 * module removes the entry */
266 #ifdef CONFIG_PROC_FS
267 if (type->typ_procroot != NULL)
268 remove_proc_subtree(type->typ_name, proc_lustre_root);
269 if (type->typ_procsym != NULL)
270 lprocfs_remove(&type->typ_procsym);
273 lu_device_type_fini(type->typ_lu);
275 spin_lock(&obd_types_lock);
276 list_del(&type->typ_chain);
277 spin_unlock(&obd_types_lock);
278 OBD_FREE(type->typ_name, strlen(name) + 1);
279 if (type->typ_dt_ops != NULL)
280 OBD_FREE_PTR(type->typ_dt_ops);
281 if (type->typ_md_ops != NULL)
282 OBD_FREE_PTR(type->typ_md_ops);
283 OBD_FREE(type, sizeof(*type));
285 } /* class_unregister_type */
286 EXPORT_SYMBOL(class_unregister_type);
289 * Create a new obd device.
291 * Find an empty slot in ::obd_devs[], create a new obd device in it.
293 * \param[in] type_name obd device type string.
294 * \param[in] name obd device name.
296 * \retval NULL if create fails, otherwise return the obd device
299 struct obd_device *class_newdev(const char *type_name, const char *name)
301 struct obd_device *result = NULL;
302 struct obd_device *newdev;
303 struct obd_type *type = NULL;
305 int new_obd_minor = 0;
308 if (strlen(name) >= MAX_OBD_NAME) {
309 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
310 RETURN(ERR_PTR(-EINVAL));
313 type = class_get_type(type_name);
315 CERROR("OBD: unknown type: %s\n", type_name);
316 RETURN(ERR_PTR(-ENODEV));
319 newdev = obd_device_alloc();
321 GOTO(out_type, result = ERR_PTR(-ENOMEM));
323 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
325 write_lock(&obd_dev_lock);
326 for (i = 0; i < class_devno_max(); i++) {
327 struct obd_device *obd = class_num2obd(i);
329 if (obd && (strcmp(name, obd->obd_name) == 0)) {
330 CERROR("Device %s already exists at %d, won't add\n",
333 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
334 "%p obd_magic %08x != %08x\n", result,
335 result->obd_magic, OBD_DEVICE_MAGIC);
336 LASSERTF(result->obd_minor == new_obd_minor,
337 "%p obd_minor %d != %d\n", result,
338 result->obd_minor, new_obd_minor);
340 obd_devs[result->obd_minor] = NULL;
341 result->obd_name[0]='\0';
343 result = ERR_PTR(-EEXIST);
346 if (!result && !obd) {
348 result->obd_minor = i;
350 result->obd_type = type;
351 strncpy(result->obd_name, name,
352 sizeof(result->obd_name) - 1);
353 obd_devs[i] = result;
356 write_unlock(&obd_dev_lock);
358 if (result == NULL && i >= class_devno_max()) {
359 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
361 GOTO(out, result = ERR_PTR(-EOVERFLOW));
367 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
368 result->obd_name, result);
372 obd_device_free(newdev);
374 class_put_type(type);
378 void class_release_dev(struct obd_device *obd)
380 struct obd_type *obd_type = obd->obd_type;
382 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
383 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
384 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
385 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
386 LASSERT(obd_type != NULL);
388 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
389 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
391 write_lock(&obd_dev_lock);
392 obd_devs[obd->obd_minor] = NULL;
393 write_unlock(&obd_dev_lock);
394 obd_device_free(obd);
396 class_put_type(obd_type);
399 int class_name2dev(const char *name)
406 read_lock(&obd_dev_lock);
407 for (i = 0; i < class_devno_max(); i++) {
408 struct obd_device *obd = class_num2obd(i);
410 if (obd && strcmp(name, obd->obd_name) == 0) {
411 /* Make sure we finished attaching before we give
412 out any references */
413 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
414 if (obd->obd_attached) {
415 read_unlock(&obd_dev_lock);
421 read_unlock(&obd_dev_lock);
426 struct obd_device *class_name2obd(const char *name)
428 int dev = class_name2dev(name);
430 if (dev < 0 || dev > class_devno_max())
432 return class_num2obd(dev);
434 EXPORT_SYMBOL(class_name2obd);
436 int class_uuid2dev(struct obd_uuid *uuid)
440 read_lock(&obd_dev_lock);
441 for (i = 0; i < class_devno_max(); i++) {
442 struct obd_device *obd = class_num2obd(i);
444 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
445 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
446 read_unlock(&obd_dev_lock);
450 read_unlock(&obd_dev_lock);
455 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
457 int dev = class_uuid2dev(uuid);
460 return class_num2obd(dev);
462 EXPORT_SYMBOL(class_uuid2obd);
465 * Get obd device from ::obd_devs[]
467 * \param num [in] array index
469 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
470 * otherwise return the obd device there.
472 struct obd_device *class_num2obd(int num)
474 struct obd_device *obd = NULL;
476 if (num < class_devno_max()) {
481 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
482 "%p obd_magic %08x != %08x\n",
483 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
484 LASSERTF(obd->obd_minor == num,
485 "%p obd_minor %0d != %0d\n",
486 obd, obd->obd_minor, num);
493 * Get obd devices count. Device in any
495 * \retval obd device count
497 int get_devices_count(void)
499 int index, max_index = class_devno_max(), dev_count = 0;
501 read_lock(&obd_dev_lock);
502 for (index = 0; index <= max_index; index++) {
503 struct obd_device *obd = class_num2obd(index);
507 read_unlock(&obd_dev_lock);
511 EXPORT_SYMBOL(get_devices_count);
513 void class_obd_list(void)
518 read_lock(&obd_dev_lock);
519 for (i = 0; i < class_devno_max(); i++) {
520 struct obd_device *obd = class_num2obd(i);
524 if (obd->obd_stopping)
526 else if (obd->obd_set_up)
528 else if (obd->obd_attached)
532 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
533 i, status, obd->obd_type->typ_name,
534 obd->obd_name, obd->obd_uuid.uuid,
535 atomic_read(&obd->obd_refcount));
537 read_unlock(&obd_dev_lock);
541 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
542 specified, then only the client with that uuid is returned,
543 otherwise any client connected to the tgt is returned. */
544 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
545 const char * typ_name,
546 struct obd_uuid *grp_uuid)
550 read_lock(&obd_dev_lock);
551 for (i = 0; i < class_devno_max(); i++) {
552 struct obd_device *obd = class_num2obd(i);
556 if ((strncmp(obd->obd_type->typ_name, typ_name,
557 strlen(typ_name)) == 0)) {
558 if (obd_uuid_equals(tgt_uuid,
559 &obd->u.cli.cl_target_uuid) &&
560 ((grp_uuid)? obd_uuid_equals(grp_uuid,
561 &obd->obd_uuid) : 1)) {
562 read_unlock(&obd_dev_lock);
567 read_unlock(&obd_dev_lock);
571 EXPORT_SYMBOL(class_find_client_obd);
573 /* Iterate the obd_device list looking devices have grp_uuid. Start
574 searching at *next, and if a device is found, the next index to look
575 at is saved in *next. If next is NULL, then the first matching device
576 will always be returned. */
577 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
583 else if (*next >= 0 && *next < class_devno_max())
588 read_lock(&obd_dev_lock);
589 for (; i < class_devno_max(); i++) {
590 struct obd_device *obd = class_num2obd(i);
594 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
597 read_unlock(&obd_dev_lock);
601 read_unlock(&obd_dev_lock);
605 EXPORT_SYMBOL(class_devices_in_group);
608 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
609 * adjust sptlrpc settings accordingly.
611 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
613 struct obd_device *obd;
617 LASSERT(namelen > 0);
619 read_lock(&obd_dev_lock);
620 for (i = 0; i < class_devno_max(); i++) {
621 obd = class_num2obd(i);
623 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
626 /* only notify mdc, osc, osp, lwp, mdt, ost
627 * because only these have a -sptlrpc llog */
628 type = obd->obd_type->typ_name;
629 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
630 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
631 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
632 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
633 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
634 strcmp(type, LUSTRE_OST_NAME) != 0)
637 if (strncmp(obd->obd_name, fsname, namelen))
640 class_incref(obd, __FUNCTION__, obd);
641 read_unlock(&obd_dev_lock);
642 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
643 sizeof(KEY_SPTLRPC_CONF),
644 KEY_SPTLRPC_CONF, 0, NULL, NULL);
646 class_decref(obd, __FUNCTION__, obd);
647 read_lock(&obd_dev_lock);
649 read_unlock(&obd_dev_lock);
652 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
654 void obd_cleanup_caches(void)
657 if (obd_device_cachep) {
658 kmem_cache_destroy(obd_device_cachep);
659 obd_device_cachep = NULL;
662 kmem_cache_destroy(obdo_cachep);
666 kmem_cache_destroy(import_cachep);
667 import_cachep = NULL;
673 int obd_init_caches(void)
678 LASSERT(obd_device_cachep == NULL);
679 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
680 sizeof(struct obd_device),
682 if (!obd_device_cachep)
683 GOTO(out, rc = -ENOMEM);
685 LASSERT(obdo_cachep == NULL);
686 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
689 GOTO(out, rc = -ENOMEM);
691 LASSERT(import_cachep == NULL);
692 import_cachep = kmem_cache_create("ll_import_cache",
693 sizeof(struct obd_import),
696 GOTO(out, rc = -ENOMEM);
700 obd_cleanup_caches();
704 /* map connection to client */
705 struct obd_export *class_conn2export(struct lustre_handle *conn)
707 struct obd_export *export;
711 CDEBUG(D_CACHE, "looking for null handle\n");
715 if (conn->cookie == -1) { /* this means assign a new connection */
716 CDEBUG(D_CACHE, "want a new connection\n");
720 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
721 export = class_handle2object(conn->cookie, NULL);
724 EXPORT_SYMBOL(class_conn2export);
726 struct obd_device *class_exp2obd(struct obd_export *exp)
732 EXPORT_SYMBOL(class_exp2obd);
734 struct obd_device *class_conn2obd(struct lustre_handle *conn)
736 struct obd_export *export;
737 export = class_conn2export(conn);
739 struct obd_device *obd = export->exp_obd;
740 class_export_put(export);
746 struct obd_import *class_exp2cliimp(struct obd_export *exp)
748 struct obd_device *obd = exp->exp_obd;
751 return obd->u.cli.cl_import;
753 EXPORT_SYMBOL(class_exp2cliimp);
755 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
757 struct obd_device *obd = class_conn2obd(conn);
760 return obd->u.cli.cl_import;
763 /* Export management functions */
764 static void class_export_destroy(struct obd_export *exp)
766 struct obd_device *obd = exp->exp_obd;
769 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
770 LASSERT(obd != NULL);
772 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
773 exp->exp_client_uuid.uuid, obd->obd_name);
775 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
776 if (exp->exp_connection)
777 ptlrpc_put_connection_superhack(exp->exp_connection);
779 LASSERT(list_empty(&exp->exp_outstanding_replies));
780 LASSERT(list_empty(&exp->exp_uncommitted_replies));
781 LASSERT(list_empty(&exp->exp_req_replay_queue));
782 LASSERT(list_empty(&exp->exp_hp_rpcs));
783 obd_destroy_export(exp);
784 class_decref(obd, "export", exp);
786 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
790 static void export_handle_addref(void *export)
792 class_export_get(export);
795 static struct portals_handle_ops export_handle_ops = {
796 .hop_addref = export_handle_addref,
800 struct obd_export *class_export_get(struct obd_export *exp)
802 atomic_inc(&exp->exp_refcount);
803 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
804 atomic_read(&exp->exp_refcount));
807 EXPORT_SYMBOL(class_export_get);
809 void class_export_put(struct obd_export *exp)
811 LASSERT(exp != NULL);
812 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
813 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
814 atomic_read(&exp->exp_refcount) - 1);
816 if (atomic_dec_and_test(&exp->exp_refcount)) {
817 LASSERT(!list_empty(&exp->exp_obd_chain));
818 LASSERT(list_empty(&exp->exp_stale_list));
819 CDEBUG(D_IOCTL, "final put %p/%s\n",
820 exp, exp->exp_client_uuid.uuid);
822 /* release nid stat refererence */
823 lprocfs_exp_cleanup(exp);
825 obd_zombie_export_add(exp);
828 EXPORT_SYMBOL(class_export_put);
830 /* Creates a new export, adds it to the hash table, and returns a
831 * pointer to it. The refcount is 2: one for the hash reference, and
832 * one for the pointer returned by this function. */
833 struct obd_export *class_new_export(struct obd_device *obd,
834 struct obd_uuid *cluuid)
836 struct obd_export *export;
837 struct cfs_hash *hash = NULL;
841 OBD_ALLOC_PTR(export);
843 return ERR_PTR(-ENOMEM);
845 export->exp_conn_cnt = 0;
846 export->exp_lock_hash = NULL;
847 export->exp_flock_hash = NULL;
848 atomic_set(&export->exp_refcount, 2);
849 atomic_set(&export->exp_rpc_count, 0);
850 atomic_set(&export->exp_cb_count, 0);
851 atomic_set(&export->exp_locks_count, 0);
852 #if LUSTRE_TRACKS_LOCK_EXP_REFS
853 INIT_LIST_HEAD(&export->exp_locks_list);
854 spin_lock_init(&export->exp_locks_list_guard);
856 atomic_set(&export->exp_replay_count, 0);
857 export->exp_obd = obd;
858 INIT_LIST_HEAD(&export->exp_outstanding_replies);
859 spin_lock_init(&export->exp_uncommitted_replies_lock);
860 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
861 INIT_LIST_HEAD(&export->exp_req_replay_queue);
862 INIT_LIST_HEAD(&export->exp_handle.h_link);
863 INIT_LIST_HEAD(&export->exp_hp_rpcs);
864 INIT_LIST_HEAD(&export->exp_reg_rpcs);
865 class_handle_hash(&export->exp_handle, &export_handle_ops);
866 export->exp_last_request_time = cfs_time_current_sec();
867 spin_lock_init(&export->exp_lock);
868 spin_lock_init(&export->exp_rpc_lock);
869 INIT_HLIST_NODE(&export->exp_uuid_hash);
870 INIT_HLIST_NODE(&export->exp_nid_hash);
871 INIT_HLIST_NODE(&export->exp_gen_hash);
872 spin_lock_init(&export->exp_bl_list_lock);
873 INIT_LIST_HEAD(&export->exp_bl_list);
874 INIT_LIST_HEAD(&export->exp_stale_list);
876 export->exp_sp_peer = LUSTRE_SP_ANY;
877 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
878 export->exp_client_uuid = *cluuid;
879 obd_init_export(export);
881 spin_lock(&obd->obd_dev_lock);
882 /* shouldn't happen, but might race */
883 if (obd->obd_stopping)
884 GOTO(exit_unlock, rc = -ENODEV);
886 hash = cfs_hash_getref(obd->obd_uuid_hash);
888 GOTO(exit_unlock, rc = -ENODEV);
889 spin_unlock(&obd->obd_dev_lock);
891 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
892 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
894 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
895 obd->obd_name, cluuid->uuid, rc);
896 GOTO(exit_err, rc = -EALREADY);
900 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
901 spin_lock(&obd->obd_dev_lock);
902 if (obd->obd_stopping) {
903 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
904 GOTO(exit_unlock, rc = -ENODEV);
907 class_incref(obd, "export", export);
908 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
909 list_add_tail(&export->exp_obd_chain_timed,
910 &export->exp_obd->obd_exports_timed);
911 export->exp_obd->obd_num_exports++;
912 spin_unlock(&obd->obd_dev_lock);
913 cfs_hash_putref(hash);
917 spin_unlock(&obd->obd_dev_lock);
920 cfs_hash_putref(hash);
921 class_handle_unhash(&export->exp_handle);
922 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
923 obd_destroy_export(export);
924 OBD_FREE_PTR(export);
927 EXPORT_SYMBOL(class_new_export);
929 void class_unlink_export(struct obd_export *exp)
931 class_handle_unhash(&exp->exp_handle);
933 spin_lock(&exp->exp_obd->obd_dev_lock);
934 /* delete an uuid-export hashitem from hashtables */
935 if (!hlist_unhashed(&exp->exp_uuid_hash))
936 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
937 &exp->exp_client_uuid,
938 &exp->exp_uuid_hash);
940 if (!hlist_unhashed(&exp->exp_gen_hash)) {
941 struct tg_export_data *ted = &exp->exp_target_data;
942 struct cfs_hash *hash;
944 /* Because obd_gen_hash will not be released until
945 * class_cleanup(), so hash should never be NULL here */
946 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
947 LASSERT(hash != NULL);
948 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
950 cfs_hash_putref(hash);
953 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
954 list_del_init(&exp->exp_obd_chain_timed);
955 exp->exp_obd->obd_num_exports--;
956 spin_unlock(&exp->exp_obd->obd_dev_lock);
957 atomic_inc(&obd_stale_export_num);
959 /* A reference is kept by obd_stale_exports list */
960 obd_stale_export_put(exp);
962 EXPORT_SYMBOL(class_unlink_export);
964 /* Import management functions */
965 static void class_import_destroy(struct obd_import *imp)
969 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
970 imp->imp_obd->obd_name);
972 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
974 ptlrpc_put_connection_superhack(imp->imp_connection);
976 while (!list_empty(&imp->imp_conn_list)) {
977 struct obd_import_conn *imp_conn;
979 imp_conn = list_entry(imp->imp_conn_list.next,
980 struct obd_import_conn, oic_item);
981 list_del_init(&imp_conn->oic_item);
982 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
983 OBD_FREE(imp_conn, sizeof(*imp_conn));
986 LASSERT(imp->imp_sec == NULL);
987 class_decref(imp->imp_obd, "import", imp);
988 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
992 static void import_handle_addref(void *import)
994 class_import_get(import);
997 static struct portals_handle_ops import_handle_ops = {
998 .hop_addref = import_handle_addref,
1002 struct obd_import *class_import_get(struct obd_import *import)
1004 atomic_inc(&import->imp_refcount);
1005 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1006 atomic_read(&import->imp_refcount),
1007 import->imp_obd->obd_name);
1010 EXPORT_SYMBOL(class_import_get);
1012 void class_import_put(struct obd_import *imp)
1016 LASSERT(list_empty(&imp->imp_zombie_chain));
1017 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1019 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1020 atomic_read(&imp->imp_refcount) - 1,
1021 imp->imp_obd->obd_name);
1023 if (atomic_dec_and_test(&imp->imp_refcount)) {
1024 CDEBUG(D_INFO, "final put import %p\n", imp);
1025 obd_zombie_import_add(imp);
1028 /* catch possible import put race */
1029 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1032 EXPORT_SYMBOL(class_import_put);
1034 static void init_imp_at(struct imp_at *at) {
1036 at_init(&at->iat_net_latency, 0, 0);
1037 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1038 /* max service estimates are tracked on the server side, so
1039 don't use the AT history here, just use the last reported
1040 val. (But keep hist for proc histogram, worst_ever) */
1041 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1046 struct obd_import *class_new_import(struct obd_device *obd)
1048 struct obd_import *imp;
1049 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1051 OBD_ALLOC(imp, sizeof(*imp));
1055 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1056 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1057 INIT_LIST_HEAD(&imp->imp_replay_list);
1058 INIT_LIST_HEAD(&imp->imp_sending_list);
1059 INIT_LIST_HEAD(&imp->imp_delayed_list);
1060 INIT_LIST_HEAD(&imp->imp_committed_list);
1061 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1062 imp->imp_known_replied_xid = 0;
1063 imp->imp_replay_cursor = &imp->imp_committed_list;
1064 spin_lock_init(&imp->imp_lock);
1065 imp->imp_last_success_conn = 0;
1066 imp->imp_state = LUSTRE_IMP_NEW;
1067 imp->imp_obd = class_incref(obd, "import", imp);
1068 mutex_init(&imp->imp_sec_mutex);
1069 init_waitqueue_head(&imp->imp_recovery_waitq);
1071 if (curr_pid_ns->child_reaper)
1072 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1074 imp->imp_sec_refpid = 1;
1076 atomic_set(&imp->imp_refcount, 2);
1077 atomic_set(&imp->imp_unregistering, 0);
1078 atomic_set(&imp->imp_inflight, 0);
1079 atomic_set(&imp->imp_replay_inflight, 0);
1080 atomic_set(&imp->imp_inval_count, 0);
1081 INIT_LIST_HEAD(&imp->imp_conn_list);
1082 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1083 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1084 init_imp_at(&imp->imp_at);
1086 /* the default magic is V2, will be used in connect RPC, and
1087 * then adjusted according to the flags in request/reply. */
1088 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1092 EXPORT_SYMBOL(class_new_import);
1094 void class_destroy_import(struct obd_import *import)
1096 LASSERT(import != NULL);
1097 LASSERT(import != LP_POISON);
1099 class_handle_unhash(&import->imp_handle);
1101 spin_lock(&import->imp_lock);
1102 import->imp_generation++;
1103 spin_unlock(&import->imp_lock);
1104 class_import_put(import);
1106 EXPORT_SYMBOL(class_destroy_import);
1108 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1110 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1112 spin_lock(&exp->exp_locks_list_guard);
1114 LASSERT(lock->l_exp_refs_nr >= 0);
1116 if (lock->l_exp_refs_target != NULL &&
1117 lock->l_exp_refs_target != exp) {
1118 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1119 exp, lock, lock->l_exp_refs_target);
1121 if ((lock->l_exp_refs_nr ++) == 0) {
1122 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1123 lock->l_exp_refs_target = exp;
1125 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1126 lock, exp, lock->l_exp_refs_nr);
1127 spin_unlock(&exp->exp_locks_list_guard);
1129 EXPORT_SYMBOL(__class_export_add_lock_ref);
1131 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1133 spin_lock(&exp->exp_locks_list_guard);
1134 LASSERT(lock->l_exp_refs_nr > 0);
1135 if (lock->l_exp_refs_target != exp) {
1136 LCONSOLE_WARN("lock %p, "
1137 "mismatching export pointers: %p, %p\n",
1138 lock, lock->l_exp_refs_target, exp);
1140 if (-- lock->l_exp_refs_nr == 0) {
1141 list_del_init(&lock->l_exp_refs_link);
1142 lock->l_exp_refs_target = NULL;
1144 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1145 lock, exp, lock->l_exp_refs_nr);
1146 spin_unlock(&exp->exp_locks_list_guard);
1148 EXPORT_SYMBOL(__class_export_del_lock_ref);
1151 /* A connection defines an export context in which preallocation can
1152 be managed. This releases the export pointer reference, and returns
1153 the export handle, so the export refcount is 1 when this function
1155 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1156 struct obd_uuid *cluuid)
1158 struct obd_export *export;
1159 LASSERT(conn != NULL);
1160 LASSERT(obd != NULL);
1161 LASSERT(cluuid != NULL);
1164 export = class_new_export(obd, cluuid);
1166 RETURN(PTR_ERR(export));
1168 conn->cookie = export->exp_handle.h_cookie;
1169 class_export_put(export);
1171 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1172 cluuid->uuid, conn->cookie);
1175 EXPORT_SYMBOL(class_connect);
1177 /* if export is involved in recovery then clean up related things */
1178 static void class_export_recovery_cleanup(struct obd_export *exp)
1180 struct obd_device *obd = exp->exp_obd;
1182 spin_lock(&obd->obd_recovery_task_lock);
1183 if (obd->obd_recovering) {
1184 if (exp->exp_in_recovery) {
1185 spin_lock(&exp->exp_lock);
1186 exp->exp_in_recovery = 0;
1187 spin_unlock(&exp->exp_lock);
1188 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1189 atomic_dec(&obd->obd_connected_clients);
1192 /* if called during recovery then should update
1193 * obd_stale_clients counter,
1194 * lightweight exports are not counted */
1195 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1196 exp->exp_obd->obd_stale_clients++;
1198 spin_unlock(&obd->obd_recovery_task_lock);
1200 spin_lock(&exp->exp_lock);
1201 /** Cleanup req replay fields */
1202 if (exp->exp_req_replay_needed) {
1203 exp->exp_req_replay_needed = 0;
1205 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1206 atomic_dec(&obd->obd_req_replay_clients);
1209 /** Cleanup lock replay data */
1210 if (exp->exp_lock_replay_needed) {
1211 exp->exp_lock_replay_needed = 0;
1213 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1214 atomic_dec(&obd->obd_lock_replay_clients);
1216 spin_unlock(&exp->exp_lock);
1219 /* This function removes 1-3 references from the export:
1220 * 1 - for export pointer passed
1221 * and if disconnect really need
1222 * 2 - removing from hash
1223 * 3 - in client_unlink_export
1224 * The export pointer passed to this function can destroyed */
1225 int class_disconnect(struct obd_export *export)
1227 int already_disconnected;
1230 if (export == NULL) {
1231 CWARN("attempting to free NULL export %p\n", export);
1235 spin_lock(&export->exp_lock);
1236 already_disconnected = export->exp_disconnected;
1237 export->exp_disconnected = 1;
1238 /* We hold references of export for uuid hash
1239 * and nid_hash and export link at least. So
1240 * it is safe to call cfs_hash_del in there. */
1241 if (!hlist_unhashed(&export->exp_nid_hash))
1242 cfs_hash_del(export->exp_obd->obd_nid_hash,
1243 &export->exp_connection->c_peer.nid,
1244 &export->exp_nid_hash);
1245 spin_unlock(&export->exp_lock);
1247 /* class_cleanup(), abort_recovery(), and class_fail_export()
1248 * all end up in here, and if any of them race we shouldn't
1249 * call extra class_export_puts(). */
1250 if (already_disconnected) {
1251 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1252 GOTO(no_disconn, already_disconnected);
1255 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1256 export->exp_handle.h_cookie);
1258 class_export_recovery_cleanup(export);
1259 class_unlink_export(export);
1261 class_export_put(export);
1264 EXPORT_SYMBOL(class_disconnect);
1266 /* Return non-zero for a fully connected export */
1267 int class_connected_export(struct obd_export *exp)
1272 spin_lock(&exp->exp_lock);
1273 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1274 spin_unlock(&exp->exp_lock);
1278 EXPORT_SYMBOL(class_connected_export);
1280 static void class_disconnect_export_list(struct list_head *list,
1281 enum obd_option flags)
1284 struct obd_export *exp;
1287 /* It's possible that an export may disconnect itself, but
1288 * nothing else will be added to this list. */
1289 while (!list_empty(list)) {
1290 exp = list_entry(list->next, struct obd_export,
1292 /* need for safe call CDEBUG after obd_disconnect */
1293 class_export_get(exp);
1295 spin_lock(&exp->exp_lock);
1296 exp->exp_flags = flags;
1297 spin_unlock(&exp->exp_lock);
1299 if (obd_uuid_equals(&exp->exp_client_uuid,
1300 &exp->exp_obd->obd_uuid)) {
1302 "exp %p export uuid == obd uuid, don't discon\n",
1304 /* Need to delete this now so we don't end up pointing
1305 * to work_list later when this export is cleaned up. */
1306 list_del_init(&exp->exp_obd_chain);
1307 class_export_put(exp);
1311 class_export_get(exp);
1312 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1313 "last request at "CFS_TIME_T"\n",
1314 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1315 exp, exp->exp_last_request_time);
1316 /* release one export reference anyway */
1317 rc = obd_disconnect(exp);
1319 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1320 obd_export_nid2str(exp), exp, rc);
1321 class_export_put(exp);
1326 void class_disconnect_exports(struct obd_device *obd)
1328 struct list_head work_list;
1331 /* Move all of the exports from obd_exports to a work list, en masse. */
1332 INIT_LIST_HEAD(&work_list);
1333 spin_lock(&obd->obd_dev_lock);
1334 list_splice_init(&obd->obd_exports, &work_list);
1335 list_splice_init(&obd->obd_delayed_exports, &work_list);
1336 spin_unlock(&obd->obd_dev_lock);
1338 if (!list_empty(&work_list)) {
1339 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1340 "disconnecting them\n", obd->obd_minor, obd);
1341 class_disconnect_export_list(&work_list,
1342 exp_flags_from_obd(obd));
1344 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1345 obd->obd_minor, obd);
1348 EXPORT_SYMBOL(class_disconnect_exports);
1350 /* Remove exports that have not completed recovery.
1352 void class_disconnect_stale_exports(struct obd_device *obd,
1353 int (*test_export)(struct obd_export *))
1355 struct list_head work_list;
1356 struct obd_export *exp, *n;
1360 INIT_LIST_HEAD(&work_list);
1361 spin_lock(&obd->obd_dev_lock);
1362 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1364 /* don't count self-export as client */
1365 if (obd_uuid_equals(&exp->exp_client_uuid,
1366 &exp->exp_obd->obd_uuid))
1369 /* don't evict clients which have no slot in last_rcvd
1370 * (e.g. lightweight connection) */
1371 if (exp->exp_target_data.ted_lr_idx == -1)
1374 spin_lock(&exp->exp_lock);
1375 if (exp->exp_failed || test_export(exp)) {
1376 spin_unlock(&exp->exp_lock);
1379 exp->exp_failed = 1;
1380 spin_unlock(&exp->exp_lock);
1382 list_move(&exp->exp_obd_chain, &work_list);
1384 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1385 obd->obd_name, exp->exp_client_uuid.uuid,
1386 exp->exp_connection == NULL ? "<unknown>" :
1387 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1388 print_export_data(exp, "EVICTING", 0, D_HA);
1390 spin_unlock(&obd->obd_dev_lock);
1393 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1394 obd->obd_name, evicted);
1396 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1397 OBD_OPT_ABORT_RECOV);
1400 EXPORT_SYMBOL(class_disconnect_stale_exports);
1402 void class_fail_export(struct obd_export *exp)
1404 int rc, already_failed;
1406 spin_lock(&exp->exp_lock);
1407 already_failed = exp->exp_failed;
1408 exp->exp_failed = 1;
1409 spin_unlock(&exp->exp_lock);
1411 if (already_failed) {
1412 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1413 exp, exp->exp_client_uuid.uuid);
1417 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1418 exp, exp->exp_client_uuid.uuid);
1420 if (obd_dump_on_timeout)
1421 libcfs_debug_dumplog();
1423 /* need for safe call CDEBUG after obd_disconnect */
1424 class_export_get(exp);
1426 /* Most callers into obd_disconnect are removing their own reference
1427 * (request, for example) in addition to the one from the hash table.
1428 * We don't have such a reference here, so make one. */
1429 class_export_get(exp);
1430 rc = obd_disconnect(exp);
1432 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1434 CDEBUG(D_HA, "disconnected export %p/%s\n",
1435 exp, exp->exp_client_uuid.uuid);
1436 class_export_put(exp);
1438 EXPORT_SYMBOL(class_fail_export);
1440 char *obd_export_nid2str(struct obd_export *exp)
1442 if (exp->exp_connection != NULL)
1443 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1447 EXPORT_SYMBOL(obd_export_nid2str);
1449 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1451 struct cfs_hash *nid_hash;
1452 struct obd_export *doomed_exp = NULL;
1453 int exports_evicted = 0;
1455 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1457 spin_lock(&obd->obd_dev_lock);
1458 /* umount has run already, so evict thread should leave
1459 * its task to umount thread now */
1460 if (obd->obd_stopping) {
1461 spin_unlock(&obd->obd_dev_lock);
1462 return exports_evicted;
1464 nid_hash = obd->obd_nid_hash;
1465 cfs_hash_getref(nid_hash);
1466 spin_unlock(&obd->obd_dev_lock);
1469 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1470 if (doomed_exp == NULL)
1473 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1474 "nid %s found, wanted nid %s, requested nid %s\n",
1475 obd_export_nid2str(doomed_exp),
1476 libcfs_nid2str(nid_key), nid);
1477 LASSERTF(doomed_exp != obd->obd_self_export,
1478 "self-export is hashed by NID?\n");
1480 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1481 "request\n", obd->obd_name,
1482 obd_uuid2str(&doomed_exp->exp_client_uuid),
1483 obd_export_nid2str(doomed_exp));
1484 class_fail_export(doomed_exp);
1485 class_export_put(doomed_exp);
1488 cfs_hash_putref(nid_hash);
1490 if (!exports_evicted)
1491 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1492 obd->obd_name, nid);
1493 return exports_evicted;
1495 EXPORT_SYMBOL(obd_export_evict_by_nid);
1497 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1499 struct cfs_hash *uuid_hash;
1500 struct obd_export *doomed_exp = NULL;
1501 struct obd_uuid doomed_uuid;
1502 int exports_evicted = 0;
1504 spin_lock(&obd->obd_dev_lock);
1505 if (obd->obd_stopping) {
1506 spin_unlock(&obd->obd_dev_lock);
1507 return exports_evicted;
1509 uuid_hash = obd->obd_uuid_hash;
1510 cfs_hash_getref(uuid_hash);
1511 spin_unlock(&obd->obd_dev_lock);
1513 obd_str2uuid(&doomed_uuid, uuid);
1514 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1515 CERROR("%s: can't evict myself\n", obd->obd_name);
1516 cfs_hash_putref(uuid_hash);
1517 return exports_evicted;
1520 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1522 if (doomed_exp == NULL) {
1523 CERROR("%s: can't disconnect %s: no exports found\n",
1524 obd->obd_name, uuid);
1526 CWARN("%s: evicting %s at adminstrative request\n",
1527 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1528 class_fail_export(doomed_exp);
1529 class_export_put(doomed_exp);
1532 cfs_hash_putref(uuid_hash);
1534 return exports_evicted;
1537 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1538 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1539 EXPORT_SYMBOL(class_export_dump_hook);
1542 static void print_export_data(struct obd_export *exp, const char *status,
1543 int locks, int debug_level)
1545 struct ptlrpc_reply_state *rs;
1546 struct ptlrpc_reply_state *first_reply = NULL;
1549 spin_lock(&exp->exp_lock);
1550 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1556 spin_unlock(&exp->exp_lock);
1558 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1559 "%p %s %llu stale:%d\n",
1560 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1561 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1562 atomic_read(&exp->exp_rpc_count),
1563 atomic_read(&exp->exp_cb_count),
1564 atomic_read(&exp->exp_locks_count),
1565 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1566 nreplies, first_reply, nreplies > 3 ? "..." : "",
1567 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1568 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1569 if (locks && class_export_dump_hook != NULL)
1570 class_export_dump_hook(exp);
1574 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1576 struct obd_export *exp;
1578 spin_lock(&obd->obd_dev_lock);
1579 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1580 print_export_data(exp, "ACTIVE", locks, debug_level);
1581 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1582 print_export_data(exp, "UNLINKED", locks, debug_level);
1583 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1584 print_export_data(exp, "DELAYED", locks, debug_level);
1585 spin_unlock(&obd->obd_dev_lock);
1586 spin_lock(&obd_zombie_impexp_lock);
1587 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1588 print_export_data(exp, "ZOMBIE", locks, debug_level);
1589 spin_unlock(&obd_zombie_impexp_lock);
1592 void obd_exports_barrier(struct obd_device *obd)
1595 LASSERT(list_empty(&obd->obd_exports));
1596 spin_lock(&obd->obd_dev_lock);
1597 while (!list_empty(&obd->obd_unlinked_exports)) {
1598 spin_unlock(&obd->obd_dev_lock);
1599 set_current_state(TASK_UNINTERRUPTIBLE);
1600 schedule_timeout(cfs_time_seconds(waited));
1601 if (waited > 5 && IS_PO2(waited)) {
1602 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1603 "more than %d seconds. "
1604 "The obd refcount = %d. Is it stuck?\n",
1605 obd->obd_name, waited,
1606 atomic_read(&obd->obd_refcount));
1607 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1610 spin_lock(&obd->obd_dev_lock);
1612 spin_unlock(&obd->obd_dev_lock);
1614 EXPORT_SYMBOL(obd_exports_barrier);
1616 /* Total amount of zombies to be destroyed */
1617 static int zombies_count = 0;
1620 * kill zombie imports and exports
1622 void obd_zombie_impexp_cull(void)
1624 struct obd_import *import;
1625 struct obd_export *export;
1629 spin_lock(&obd_zombie_impexp_lock);
1632 if (!list_empty(&obd_zombie_imports)) {
1633 import = list_entry(obd_zombie_imports.next,
1636 list_del_init(&import->imp_zombie_chain);
1640 if (!list_empty(&obd_zombie_exports)) {
1641 export = list_entry(obd_zombie_exports.next,
1644 list_del_init(&export->exp_obd_chain);
1647 spin_unlock(&obd_zombie_impexp_lock);
1649 if (import != NULL) {
1650 class_import_destroy(import);
1651 spin_lock(&obd_zombie_impexp_lock);
1653 spin_unlock(&obd_zombie_impexp_lock);
1656 if (export != NULL) {
1657 class_export_destroy(export);
1658 spin_lock(&obd_zombie_impexp_lock);
1660 spin_unlock(&obd_zombie_impexp_lock);
1664 } while (import != NULL || export != NULL);
1668 static struct completion obd_zombie_start;
1669 static struct completion obd_zombie_stop;
1670 static unsigned long obd_zombie_flags;
1671 static wait_queue_head_t obd_zombie_waitq;
1672 static pid_t obd_zombie_pid;
1675 OBD_ZOMBIE_STOP = 0x0001,
1679 * check for work for kill zombie import/export thread.
1681 static int obd_zombie_impexp_check(void *arg)
1685 spin_lock(&obd_zombie_impexp_lock);
1686 rc = (zombies_count == 0) &&
1687 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1688 spin_unlock(&obd_zombie_impexp_lock);
1694 * Add export to the obd_zombe thread and notify it.
1696 static void obd_zombie_export_add(struct obd_export *exp) {
1697 atomic_dec(&obd_stale_export_num);
1698 spin_lock(&exp->exp_obd->obd_dev_lock);
1699 LASSERT(!list_empty(&exp->exp_obd_chain));
1700 list_del_init(&exp->exp_obd_chain);
1701 spin_unlock(&exp->exp_obd->obd_dev_lock);
1702 spin_lock(&obd_zombie_impexp_lock);
1704 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1705 spin_unlock(&obd_zombie_impexp_lock);
1707 obd_zombie_impexp_notify();
1711 * Add import to the obd_zombe thread and notify it.
1713 static void obd_zombie_import_add(struct obd_import *imp) {
1714 LASSERT(imp->imp_sec == NULL);
1715 spin_lock(&obd_zombie_impexp_lock);
1716 LASSERT(list_empty(&imp->imp_zombie_chain));
1718 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1719 spin_unlock(&obd_zombie_impexp_lock);
1721 obd_zombie_impexp_notify();
1725 * notify import/export destroy thread about new zombie.
1727 static void obd_zombie_impexp_notify(void)
1730 * Make sure obd_zomebie_impexp_thread get this notification.
1731 * It is possible this signal only get by obd_zombie_barrier, and
1732 * barrier gulps this notification and sleeps away and hangs ensues
1734 wake_up_all(&obd_zombie_waitq);
1738 * check whether obd_zombie is idle
1740 static int obd_zombie_is_idle(void)
1744 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1745 spin_lock(&obd_zombie_impexp_lock);
1746 rc = (zombies_count == 0);
1747 spin_unlock(&obd_zombie_impexp_lock);
1752 * wait when obd_zombie import/export queues become empty
1754 void obd_zombie_barrier(void)
1756 struct l_wait_info lwi = { 0 };
1758 if (obd_zombie_pid == current_pid())
1759 /* don't wait for myself */
1761 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1763 EXPORT_SYMBOL(obd_zombie_barrier);
1766 struct obd_export *obd_stale_export_get(void)
1768 struct obd_export *exp = NULL;
1771 spin_lock(&obd_stale_export_lock);
1772 if (!list_empty(&obd_stale_exports)) {
1773 exp = list_entry(obd_stale_exports.next,
1774 struct obd_export, exp_stale_list);
1775 list_del_init(&exp->exp_stale_list);
1777 spin_unlock(&obd_stale_export_lock);
1780 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1781 atomic_read(&obd_stale_export_num));
1785 EXPORT_SYMBOL(obd_stale_export_get);
1787 void obd_stale_export_put(struct obd_export *exp)
1791 LASSERT(list_empty(&exp->exp_stale_list));
1792 if (exp->exp_lock_hash &&
1793 atomic_read(&exp->exp_lock_hash->hs_count)) {
1794 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1795 atomic_read(&obd_stale_export_num));
1797 spin_lock_bh(&exp->exp_bl_list_lock);
1798 spin_lock(&obd_stale_export_lock);
1799 /* Add to the tail if there is no blocked locks,
1800 * to the head otherwise. */
1801 if (list_empty(&exp->exp_bl_list))
1802 list_add_tail(&exp->exp_stale_list,
1803 &obd_stale_exports);
1805 list_add(&exp->exp_stale_list,
1806 &obd_stale_exports);
1808 spin_unlock(&obd_stale_export_lock);
1809 spin_unlock_bh(&exp->exp_bl_list_lock);
1811 class_export_put(exp);
1815 EXPORT_SYMBOL(obd_stale_export_put);
1818 * Adjust the position of the export in the stale list,
1819 * i.e. move to the head of the list if is needed.
1821 void obd_stale_export_adjust(struct obd_export *exp)
1823 LASSERT(exp != NULL);
1824 spin_lock_bh(&exp->exp_bl_list_lock);
1825 spin_lock(&obd_stale_export_lock);
1827 if (!list_empty(&exp->exp_stale_list) &&
1828 !list_empty(&exp->exp_bl_list))
1829 list_move(&exp->exp_stale_list, &obd_stale_exports);
1831 spin_unlock(&obd_stale_export_lock);
1832 spin_unlock_bh(&exp->exp_bl_list_lock);
1834 EXPORT_SYMBOL(obd_stale_export_adjust);
1837 * destroy zombie export/import thread.
1839 static int obd_zombie_impexp_thread(void *unused)
1841 unshare_fs_struct();
1842 complete(&obd_zombie_start);
1844 obd_zombie_pid = current_pid();
1846 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1847 struct l_wait_info lwi = { 0 };
1849 l_wait_event(obd_zombie_waitq,
1850 !obd_zombie_impexp_check(NULL), &lwi);
1851 obd_zombie_impexp_cull();
1854 * Notify obd_zombie_barrier callers that queues
1857 wake_up(&obd_zombie_waitq);
1860 complete(&obd_zombie_stop);
1867 * start destroy zombie import/export thread
1869 int obd_zombie_impexp_init(void)
1871 struct task_struct *task;
1873 INIT_LIST_HEAD(&obd_zombie_imports);
1875 INIT_LIST_HEAD(&obd_zombie_exports);
1876 spin_lock_init(&obd_zombie_impexp_lock);
1877 init_completion(&obd_zombie_start);
1878 init_completion(&obd_zombie_stop);
1879 init_waitqueue_head(&obd_zombie_waitq);
1882 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1884 RETURN(PTR_ERR(task));
1886 wait_for_completion(&obd_zombie_start);
1890 * stop destroy zombie import/export thread
1892 void obd_zombie_impexp_stop(void)
1894 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1895 obd_zombie_impexp_notify();
1896 wait_for_completion(&obd_zombie_stop);
1899 /***** Kernel-userspace comm helpers *******/
1901 /* Get length of entire message, including header */
1902 int kuc_len(int payload_len)
1904 return sizeof(struct kuc_hdr) + payload_len;
1906 EXPORT_SYMBOL(kuc_len);
1908 /* Get a pointer to kuc header, given a ptr to the payload
1909 * @param p Pointer to payload area
1910 * @returns Pointer to kuc header
1912 struct kuc_hdr * kuc_ptr(void *p)
1914 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1915 LASSERT(lh->kuc_magic == KUC_MAGIC);
1918 EXPORT_SYMBOL(kuc_ptr);
1920 /* Alloc space for a message, and fill in header
1921 * @return Pointer to payload area
1923 void *kuc_alloc(int payload_len, int transport, int type)
1926 int len = kuc_len(payload_len);
1930 return ERR_PTR(-ENOMEM);
1932 lh->kuc_magic = KUC_MAGIC;
1933 lh->kuc_transport = transport;
1934 lh->kuc_msgtype = type;
1935 lh->kuc_msglen = len;
1937 return (void *)(lh + 1);
1939 EXPORT_SYMBOL(kuc_alloc);
1941 /* Takes pointer to payload area */
1942 void kuc_free(void *p, int payload_len)
1944 struct kuc_hdr *lh = kuc_ptr(p);
1945 OBD_FREE(lh, kuc_len(payload_len));
1947 EXPORT_SYMBOL(kuc_free);
1949 struct obd_request_slot_waiter {
1950 struct list_head orsw_entry;
1951 wait_queue_head_t orsw_waitq;
1955 static bool obd_request_slot_avail(struct client_obd *cli,
1956 struct obd_request_slot_waiter *orsw)
1960 spin_lock(&cli->cl_loi_list_lock);
1961 avail = !!list_empty(&orsw->orsw_entry);
1962 spin_unlock(&cli->cl_loi_list_lock);
1968 * For network flow control, the RPC sponsor needs to acquire a credit
1969 * before sending the RPC. The credits count for a connection is defined
1970 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1971 * the subsequent RPC sponsors need to wait until others released their
1972 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1974 int obd_get_request_slot(struct client_obd *cli)
1976 struct obd_request_slot_waiter orsw;
1977 struct l_wait_info lwi;
1980 spin_lock(&cli->cl_loi_list_lock);
1981 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1982 cli->cl_r_in_flight++;
1983 spin_unlock(&cli->cl_loi_list_lock);
1987 init_waitqueue_head(&orsw.orsw_waitq);
1988 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1989 orsw.orsw_signaled = false;
1990 spin_unlock(&cli->cl_loi_list_lock);
1992 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1993 rc = l_wait_event(orsw.orsw_waitq,
1994 obd_request_slot_avail(cli, &orsw) ||
1998 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1999 * freed but other (such as obd_put_request_slot) is using it. */
2000 spin_lock(&cli->cl_loi_list_lock);
2002 if (!orsw.orsw_signaled) {
2003 if (list_empty(&orsw.orsw_entry))
2004 cli->cl_r_in_flight--;
2006 list_del(&orsw.orsw_entry);
2010 if (orsw.orsw_signaled) {
2011 LASSERT(list_empty(&orsw.orsw_entry));
2015 spin_unlock(&cli->cl_loi_list_lock);
2019 EXPORT_SYMBOL(obd_get_request_slot);
2021 void obd_put_request_slot(struct client_obd *cli)
2023 struct obd_request_slot_waiter *orsw;
2025 spin_lock(&cli->cl_loi_list_lock);
2026 cli->cl_r_in_flight--;
2028 /* If there is free slot, wakeup the first waiter. */
2029 if (!list_empty(&cli->cl_loi_read_list) &&
2030 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2031 orsw = list_entry(cli->cl_loi_read_list.next,
2032 struct obd_request_slot_waiter, orsw_entry);
2033 list_del_init(&orsw->orsw_entry);
2034 cli->cl_r_in_flight++;
2035 wake_up(&orsw->orsw_waitq);
2037 spin_unlock(&cli->cl_loi_list_lock);
2039 EXPORT_SYMBOL(obd_put_request_slot);
2041 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2043 return cli->cl_max_rpcs_in_flight;
2045 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2047 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2049 struct obd_request_slot_waiter *orsw;
2056 if (max > OBD_MAX_RIF_MAX || max < 1)
2059 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2060 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2061 /* adjust max_mod_rpcs_in_flight to ensure it is always
2062 * strictly lower that max_rpcs_in_flight */
2064 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2065 "because it must be higher than "
2066 "max_mod_rpcs_in_flight value",
2067 cli->cl_import->imp_obd->obd_name);
2070 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2071 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2077 spin_lock(&cli->cl_loi_list_lock);
2078 old = cli->cl_max_rpcs_in_flight;
2079 cli->cl_max_rpcs_in_flight = max;
2082 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2083 for (i = 0; i < diff; i++) {
2084 if (list_empty(&cli->cl_loi_read_list))
2087 orsw = list_entry(cli->cl_loi_read_list.next,
2088 struct obd_request_slot_waiter, orsw_entry);
2089 list_del_init(&orsw->orsw_entry);
2090 cli->cl_r_in_flight++;
2091 wake_up(&orsw->orsw_waitq);
2093 spin_unlock(&cli->cl_loi_list_lock);
2097 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2099 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2101 return cli->cl_max_mod_rpcs_in_flight;
2103 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2105 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2107 struct obd_connect_data *ocd;
2111 if (max > OBD_MAX_RIF_MAX || max < 1)
2114 /* cannot exceed or equal max_rpcs_in_flight */
2115 if (max >= cli->cl_max_rpcs_in_flight) {
2116 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2117 "higher or equal to max_rpcs_in_flight value (%u)\n",
2118 cli->cl_import->imp_obd->obd_name,
2119 max, cli->cl_max_rpcs_in_flight);
2123 /* cannot exceed max modify RPCs in flight supported by the server */
2124 ocd = &cli->cl_import->imp_connect_data;
2125 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2126 maxmodrpcs = ocd->ocd_maxmodrpcs;
2129 if (max > maxmodrpcs) {
2130 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2131 "higher than max_mod_rpcs_per_client value (%hu) "
2132 "returned by the server at connection\n",
2133 cli->cl_import->imp_obd->obd_name,
2138 spin_lock(&cli->cl_mod_rpcs_lock);
2140 prev = cli->cl_max_mod_rpcs_in_flight;
2141 cli->cl_max_mod_rpcs_in_flight = max;
2143 /* wakeup waiters if limit has been increased */
2144 if (cli->cl_max_mod_rpcs_in_flight > prev)
2145 wake_up(&cli->cl_mod_rpcs_waitq);
2147 spin_unlock(&cli->cl_mod_rpcs_lock);
2151 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2154 #define pct(a, b) (b ? a * 100 / b : 0)
2155 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2156 struct seq_file *seq)
2159 unsigned long mod_tot = 0, mod_cum;
2162 do_gettimeofday(&now);
2164 spin_lock(&cli->cl_mod_rpcs_lock);
2166 seq_printf(seq, "snapshot_time: %lu.%lu (secs.usecs)\n",
2167 now.tv_sec, now.tv_usec);
2168 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2169 cli->cl_mod_rpcs_in_flight);
2171 seq_printf(seq, "\n\t\t\tmodify\n");
2172 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2174 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2177 for (i = 0; i < OBD_HIST_MAX; i++) {
2178 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2180 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2181 i, mod, pct(mod, mod_tot),
2182 pct(mod_cum, mod_tot));
2183 if (mod_cum == mod_tot)
2187 spin_unlock(&cli->cl_mod_rpcs_lock);
2191 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2195 /* The number of modify RPCs sent in parallel is limited
2196 * because the server has a finite number of slots per client to
2197 * store request result and ensure reply reconstruction when needed.
2198 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2199 * that takes into account server limit and cl_max_rpcs_in_flight
2201 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2202 * one close request is allowed above the maximum.
2204 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2209 /* A slot is available if
2210 * - number of modify RPCs in flight is less than the max
2211 * - it's a close RPC and no other close request is in flight
2213 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2214 (close_req && cli->cl_close_rpcs_in_flight == 0);
2219 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2224 spin_lock(&cli->cl_mod_rpcs_lock);
2225 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2226 spin_unlock(&cli->cl_mod_rpcs_lock);
2230 /* Get a modify RPC slot from the obd client @cli according
2231 * to the kind of operation @opc that is going to be sent
2232 * and the intent @it of the operation if it applies.
2233 * If the maximum number of modify RPCs in flight is reached
2234 * the thread is put to sleep.
2235 * Returns the tag to be set in the request message. Tag 0
2236 * is reserved for non-modifying requests.
2238 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2239 struct lookup_intent *it)
2241 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2242 bool close_req = false;
2245 /* read-only metadata RPCs don't consume a slot on MDT
2246 * for reply reconstruction
2248 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2249 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2252 if (opc == MDS_CLOSE)
2256 spin_lock(&cli->cl_mod_rpcs_lock);
2257 max = cli->cl_max_mod_rpcs_in_flight;
2258 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2259 /* there is a slot available */
2260 cli->cl_mod_rpcs_in_flight++;
2262 cli->cl_close_rpcs_in_flight++;
2263 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2264 cli->cl_mod_rpcs_in_flight);
2265 /* find a free tag */
2266 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2268 LASSERT(i < OBD_MAX_RIF_MAX);
2269 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2270 spin_unlock(&cli->cl_mod_rpcs_lock);
2271 /* tag 0 is reserved for non-modify RPCs */
2274 spin_unlock(&cli->cl_mod_rpcs_lock);
2276 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2277 "opc %u, max %hu\n",
2278 cli->cl_import->imp_obd->obd_name, opc, max);
2280 l_wait_event(cli->cl_mod_rpcs_waitq,
2281 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2284 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2286 /* Put a modify RPC slot from the obd client @cli according
2287 * to the kind of operation @opc that has been sent and the
2288 * intent @it of the operation if it applies.
2290 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2291 struct lookup_intent *it, __u16 tag)
2293 bool close_req = false;
2295 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2296 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2299 if (opc == MDS_CLOSE)
2302 spin_lock(&cli->cl_mod_rpcs_lock);
2303 cli->cl_mod_rpcs_in_flight--;
2305 cli->cl_close_rpcs_in_flight--;
2306 /* release the tag in the bitmap */
2307 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2308 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2309 spin_unlock(&cli->cl_mod_rpcs_lock);
2310 wake_up(&cli->cl_mod_rpcs_waitq);
2312 EXPORT_SYMBOL(obd_put_mod_rpc_slot);