4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_kernelcomm.h>
49 spinlock_t obd_types_lock;
51 static struct kmem_cache *obd_device_cachep;
52 struct kmem_cache *obdo_cachep;
53 EXPORT_SYMBOL(obdo_cachep);
54 static struct kmem_cache *import_cachep;
56 static struct list_head obd_zombie_imports;
57 static struct list_head obd_zombie_exports;
58 static spinlock_t obd_zombie_impexp_lock;
60 static void obd_zombie_impexp_notify(void);
61 static void obd_zombie_export_add(struct obd_export *exp);
62 static void obd_zombie_import_add(struct obd_import *imp);
63 static void print_export_data(struct obd_export *exp,
64 const char *status, int locks);
66 struct list_head obd_stale_exports;
67 spinlock_t obd_stale_export_lock;
68 atomic_t obd_stale_export_num;
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
71 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
74 * support functions: we could use inter-module communication, but this
75 * is more portable to other OS's
77 static struct obd_device *obd_device_alloc(void)
79 struct obd_device *obd;
81 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
83 obd->obd_magic = OBD_DEVICE_MAGIC;
88 static void obd_device_free(struct obd_device *obd)
91 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
92 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
93 if (obd->obd_namespace != NULL) {
94 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
95 obd, obd->obd_namespace, obd->obd_force);
98 lu_ref_fini(&obd->obd_reference);
99 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
102 struct obd_type *class_search_type(const char *name)
104 struct list_head *tmp;
105 struct obd_type *type;
107 spin_lock(&obd_types_lock);
108 list_for_each(tmp, &obd_types) {
109 type = list_entry(tmp, struct obd_type, typ_chain);
110 if (strcmp(type->typ_name, name) == 0) {
111 spin_unlock(&obd_types_lock);
115 spin_unlock(&obd_types_lock);
118 EXPORT_SYMBOL(class_search_type);
120 struct obd_type *class_get_type(const char *name)
122 struct obd_type *type = class_search_type(name);
124 #ifdef HAVE_MODULE_LOADING_SUPPORT
126 const char *modname = name;
128 if (strcmp(modname, "obdfilter") == 0)
131 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
132 modname = LUSTRE_OSP_NAME;
134 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
135 modname = LUSTRE_MDT_NAME;
137 if (!request_module("%s", modname)) {
138 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
139 type = class_search_type(name);
141 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
147 spin_lock(&type->obd_type_lock);
149 try_module_get(type->typ_dt_ops->o_owner);
150 spin_unlock(&type->obd_type_lock);
155 void class_put_type(struct obd_type *type)
158 spin_lock(&type->obd_type_lock);
160 module_put(type->typ_dt_ops->o_owner);
161 spin_unlock(&type->obd_type_lock);
164 #define CLASS_MAX_NAME 1024
166 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
167 bool enable_proc, struct lprocfs_vars *vars,
168 const char *name, struct lu_device_type *ldt)
170 struct obd_type *type;
175 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
177 if (class_search_type(name)) {
178 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
183 OBD_ALLOC(type, sizeof(*type));
187 OBD_ALLOC_PTR(type->typ_dt_ops);
188 OBD_ALLOC_PTR(type->typ_md_ops);
189 OBD_ALLOC(type->typ_name, strlen(name) + 1);
191 if (type->typ_dt_ops == NULL ||
192 type->typ_md_ops == NULL ||
193 type->typ_name == NULL)
196 *(type->typ_dt_ops) = *dt_ops;
197 /* md_ops is optional */
199 *(type->typ_md_ops) = *md_ops;
200 strcpy(type->typ_name, name);
201 spin_lock_init(&type->obd_type_lock);
203 #ifdef CONFIG_PROC_FS
205 type->typ_procroot = lprocfs_register(type->typ_name,
208 if (IS_ERR(type->typ_procroot)) {
209 rc = PTR_ERR(type->typ_procroot);
210 type->typ_procroot = NULL;
217 rc = lu_device_type_init(ldt);
222 spin_lock(&obd_types_lock);
223 list_add(&type->typ_chain, &obd_types);
224 spin_unlock(&obd_types_lock);
229 if (type->typ_name != NULL) {
230 #ifdef CONFIG_PROC_FS
231 if (type->typ_procroot != NULL)
232 remove_proc_subtree(type->typ_name, proc_lustre_root);
234 OBD_FREE(type->typ_name, strlen(name) + 1);
236 if (type->typ_md_ops != NULL)
237 OBD_FREE_PTR(type->typ_md_ops);
238 if (type->typ_dt_ops != NULL)
239 OBD_FREE_PTR(type->typ_dt_ops);
240 OBD_FREE(type, sizeof(*type));
243 EXPORT_SYMBOL(class_register_type);
245 int class_unregister_type(const char *name)
247 struct obd_type *type = class_search_type(name);
251 CERROR("unknown obd type\n");
255 if (type->typ_refcnt) {
256 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
257 /* This is a bad situation, let's make the best of it */
258 /* Remove ops, but leave the name for debugging */
259 OBD_FREE_PTR(type->typ_dt_ops);
260 OBD_FREE_PTR(type->typ_md_ops);
264 /* we do not use type->typ_procroot as for compatibility purposes
265 * other modules can share names (i.e. lod can use lov entry). so
266 * we can't reference pointer as it can get invalided when another
267 * module removes the entry */
268 #ifdef CONFIG_PROC_FS
269 if (type->typ_procroot != NULL)
270 remove_proc_subtree(type->typ_name, proc_lustre_root);
271 if (type->typ_procsym != NULL)
272 lprocfs_remove(&type->typ_procsym);
275 lu_device_type_fini(type->typ_lu);
277 spin_lock(&obd_types_lock);
278 list_del(&type->typ_chain);
279 spin_unlock(&obd_types_lock);
280 OBD_FREE(type->typ_name, strlen(name) + 1);
281 if (type->typ_dt_ops != NULL)
282 OBD_FREE_PTR(type->typ_dt_ops);
283 if (type->typ_md_ops != NULL)
284 OBD_FREE_PTR(type->typ_md_ops);
285 OBD_FREE(type, sizeof(*type));
287 } /* class_unregister_type */
288 EXPORT_SYMBOL(class_unregister_type);
291 * Create a new obd device.
293 * Find an empty slot in ::obd_devs[], create a new obd device in it.
295 * \param[in] type_name obd device type string.
296 * \param[in] name obd device name.
298 * \retval NULL if create fails, otherwise return the obd device
301 struct obd_device *class_newdev(const char *type_name, const char *name)
303 struct obd_device *result = NULL;
304 struct obd_device *newdev;
305 struct obd_type *type = NULL;
307 int new_obd_minor = 0;
310 if (strlen(name) >= MAX_OBD_NAME) {
311 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
312 RETURN(ERR_PTR(-EINVAL));
315 type = class_get_type(type_name);
317 CERROR("OBD: unknown type: %s\n", type_name);
318 RETURN(ERR_PTR(-ENODEV));
321 newdev = obd_device_alloc();
323 GOTO(out_type, result = ERR_PTR(-ENOMEM));
325 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
327 write_lock(&obd_dev_lock);
328 for (i = 0; i < class_devno_max(); i++) {
329 struct obd_device *obd = class_num2obd(i);
331 if (obd && (strcmp(name, obd->obd_name) == 0)) {
332 CERROR("Device %s already exists at %d, won't add\n",
335 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
336 "%p obd_magic %08x != %08x\n", result,
337 result->obd_magic, OBD_DEVICE_MAGIC);
338 LASSERTF(result->obd_minor == new_obd_minor,
339 "%p obd_minor %d != %d\n", result,
340 result->obd_minor, new_obd_minor);
342 obd_devs[result->obd_minor] = NULL;
343 result->obd_name[0]='\0';
345 result = ERR_PTR(-EEXIST);
348 if (!result && !obd) {
350 result->obd_minor = i;
352 result->obd_type = type;
353 strncpy(result->obd_name, name,
354 sizeof(result->obd_name) - 1);
355 obd_devs[i] = result;
358 write_unlock(&obd_dev_lock);
360 if (result == NULL && i >= class_devno_max()) {
361 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
363 GOTO(out, result = ERR_PTR(-EOVERFLOW));
369 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
370 result->obd_name, result);
374 obd_device_free(newdev);
376 class_put_type(type);
380 void class_release_dev(struct obd_device *obd)
382 struct obd_type *obd_type = obd->obd_type;
384 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
385 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
386 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
387 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
388 LASSERT(obd_type != NULL);
390 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
391 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
393 write_lock(&obd_dev_lock);
394 obd_devs[obd->obd_minor] = NULL;
395 write_unlock(&obd_dev_lock);
396 obd_device_free(obd);
398 class_put_type(obd_type);
401 int class_name2dev(const char *name)
408 read_lock(&obd_dev_lock);
409 for (i = 0; i < class_devno_max(); i++) {
410 struct obd_device *obd = class_num2obd(i);
412 if (obd && strcmp(name, obd->obd_name) == 0) {
413 /* Make sure we finished attaching before we give
414 out any references */
415 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
416 if (obd->obd_attached) {
417 read_unlock(&obd_dev_lock);
423 read_unlock(&obd_dev_lock);
428 struct obd_device *class_name2obd(const char *name)
430 int dev = class_name2dev(name);
432 if (dev < 0 || dev > class_devno_max())
434 return class_num2obd(dev);
436 EXPORT_SYMBOL(class_name2obd);
438 int class_uuid2dev(struct obd_uuid *uuid)
442 read_lock(&obd_dev_lock);
443 for (i = 0; i < class_devno_max(); i++) {
444 struct obd_device *obd = class_num2obd(i);
446 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
447 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
448 read_unlock(&obd_dev_lock);
452 read_unlock(&obd_dev_lock);
457 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
459 int dev = class_uuid2dev(uuid);
462 return class_num2obd(dev);
464 EXPORT_SYMBOL(class_uuid2obd);
467 * Get obd device from ::obd_devs[]
469 * \param num [in] array index
471 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
472 * otherwise return the obd device there.
474 struct obd_device *class_num2obd(int num)
476 struct obd_device *obd = NULL;
478 if (num < class_devno_max()) {
483 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
484 "%p obd_magic %08x != %08x\n",
485 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
486 LASSERTF(obd->obd_minor == num,
487 "%p obd_minor %0d != %0d\n",
488 obd, obd->obd_minor, num);
495 * Get obd devices count. Device in any
497 * \retval obd device count
499 int get_devices_count(void)
501 int index, max_index = class_devno_max(), dev_count = 0;
503 read_lock(&obd_dev_lock);
504 for (index = 0; index <= max_index; index++) {
505 struct obd_device *obd = class_num2obd(index);
509 read_unlock(&obd_dev_lock);
513 EXPORT_SYMBOL(get_devices_count);
515 void class_obd_list(void)
520 read_lock(&obd_dev_lock);
521 for (i = 0; i < class_devno_max(); i++) {
522 struct obd_device *obd = class_num2obd(i);
526 if (obd->obd_stopping)
528 else if (obd->obd_set_up)
530 else if (obd->obd_attached)
534 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
535 i, status, obd->obd_type->typ_name,
536 obd->obd_name, obd->obd_uuid.uuid,
537 atomic_read(&obd->obd_refcount));
539 read_unlock(&obd_dev_lock);
543 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
544 specified, then only the client with that uuid is returned,
545 otherwise any client connected to the tgt is returned. */
546 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
547 const char * typ_name,
548 struct obd_uuid *grp_uuid)
552 read_lock(&obd_dev_lock);
553 for (i = 0; i < class_devno_max(); i++) {
554 struct obd_device *obd = class_num2obd(i);
558 if ((strncmp(obd->obd_type->typ_name, typ_name,
559 strlen(typ_name)) == 0)) {
560 if (obd_uuid_equals(tgt_uuid,
561 &obd->u.cli.cl_target_uuid) &&
562 ((grp_uuid)? obd_uuid_equals(grp_uuid,
563 &obd->obd_uuid) : 1)) {
564 read_unlock(&obd_dev_lock);
569 read_unlock(&obd_dev_lock);
573 EXPORT_SYMBOL(class_find_client_obd);
575 /* Iterate the obd_device list looking devices have grp_uuid. Start
576 searching at *next, and if a device is found, the next index to look
577 at is saved in *next. If next is NULL, then the first matching device
578 will always be returned. */
579 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
585 else if (*next >= 0 && *next < class_devno_max())
590 read_lock(&obd_dev_lock);
591 for (; i < class_devno_max(); i++) {
592 struct obd_device *obd = class_num2obd(i);
596 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
599 read_unlock(&obd_dev_lock);
603 read_unlock(&obd_dev_lock);
607 EXPORT_SYMBOL(class_devices_in_group);
610 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
611 * adjust sptlrpc settings accordingly.
613 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
615 struct obd_device *obd;
619 LASSERT(namelen > 0);
621 read_lock(&obd_dev_lock);
622 for (i = 0; i < class_devno_max(); i++) {
623 obd = class_num2obd(i);
625 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
628 /* only notify mdc, osc, mdt, ost */
629 type = obd->obd_type->typ_name;
630 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
631 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
632 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
633 strcmp(type, LUSTRE_OST_NAME) != 0)
636 if (strncmp(obd->obd_name, fsname, namelen))
639 class_incref(obd, __FUNCTION__, obd);
640 read_unlock(&obd_dev_lock);
641 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
642 sizeof(KEY_SPTLRPC_CONF),
643 KEY_SPTLRPC_CONF, 0, NULL, NULL);
645 class_decref(obd, __FUNCTION__, obd);
646 read_lock(&obd_dev_lock);
648 read_unlock(&obd_dev_lock);
651 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
653 void obd_cleanup_caches(void)
656 if (obd_device_cachep) {
657 kmem_cache_destroy(obd_device_cachep);
658 obd_device_cachep = NULL;
661 kmem_cache_destroy(obdo_cachep);
665 kmem_cache_destroy(import_cachep);
666 import_cachep = NULL;
672 int obd_init_caches(void)
677 LASSERT(obd_device_cachep == NULL);
678 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
679 sizeof(struct obd_device),
681 if (!obd_device_cachep)
682 GOTO(out, rc = -ENOMEM);
684 LASSERT(obdo_cachep == NULL);
685 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
688 GOTO(out, rc = -ENOMEM);
690 LASSERT(import_cachep == NULL);
691 import_cachep = kmem_cache_create("ll_import_cache",
692 sizeof(struct obd_import),
695 GOTO(out, rc = -ENOMEM);
699 obd_cleanup_caches();
703 /* map connection to client */
704 struct obd_export *class_conn2export(struct lustre_handle *conn)
706 struct obd_export *export;
710 CDEBUG(D_CACHE, "looking for null handle\n");
714 if (conn->cookie == -1) { /* this means assign a new connection */
715 CDEBUG(D_CACHE, "want a new connection\n");
719 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
720 export = class_handle2object(conn->cookie, NULL);
723 EXPORT_SYMBOL(class_conn2export);
725 struct obd_device *class_exp2obd(struct obd_export *exp)
731 EXPORT_SYMBOL(class_exp2obd);
733 struct obd_device *class_conn2obd(struct lustre_handle *conn)
735 struct obd_export *export;
736 export = class_conn2export(conn);
738 struct obd_device *obd = export->exp_obd;
739 class_export_put(export);
745 struct obd_import *class_exp2cliimp(struct obd_export *exp)
747 struct obd_device *obd = exp->exp_obd;
750 return obd->u.cli.cl_import;
752 EXPORT_SYMBOL(class_exp2cliimp);
754 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
756 struct obd_device *obd = class_conn2obd(conn);
759 return obd->u.cli.cl_import;
762 /* Export management functions */
763 static void class_export_destroy(struct obd_export *exp)
765 struct obd_device *obd = exp->exp_obd;
768 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
769 LASSERT(obd != NULL);
771 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
772 exp->exp_client_uuid.uuid, obd->obd_name);
774 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
775 if (exp->exp_connection)
776 ptlrpc_put_connection_superhack(exp->exp_connection);
778 LASSERT(list_empty(&exp->exp_outstanding_replies));
779 LASSERT(list_empty(&exp->exp_uncommitted_replies));
780 LASSERT(list_empty(&exp->exp_req_replay_queue));
781 LASSERT(list_empty(&exp->exp_hp_rpcs));
782 obd_destroy_export(exp);
783 class_decref(obd, "export", exp);
785 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
789 static void export_handle_addref(void *export)
791 class_export_get(export);
794 static struct portals_handle_ops export_handle_ops = {
795 .hop_addref = export_handle_addref,
799 struct obd_export *class_export_get(struct obd_export *exp)
801 atomic_inc(&exp->exp_refcount);
802 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
803 atomic_read(&exp->exp_refcount));
806 EXPORT_SYMBOL(class_export_get);
808 void class_export_put(struct obd_export *exp)
810 LASSERT(exp != NULL);
811 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
812 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
813 atomic_read(&exp->exp_refcount) - 1);
815 if (atomic_dec_and_test(&exp->exp_refcount)) {
816 LASSERT(!list_empty(&exp->exp_obd_chain));
817 LASSERT(list_empty(&exp->exp_stale_list));
818 CDEBUG(D_IOCTL, "final put %p/%s\n",
819 exp, exp->exp_client_uuid.uuid);
821 /* release nid stat refererence */
822 lprocfs_exp_cleanup(exp);
824 obd_zombie_export_add(exp);
827 EXPORT_SYMBOL(class_export_put);
829 /* Creates a new export, adds it to the hash table, and returns a
830 * pointer to it. The refcount is 2: one for the hash reference, and
831 * one for the pointer returned by this function. */
832 struct obd_export *class_new_export(struct obd_device *obd,
833 struct obd_uuid *cluuid)
835 struct obd_export *export;
836 struct cfs_hash *hash = NULL;
840 OBD_ALLOC_PTR(export);
842 return ERR_PTR(-ENOMEM);
844 export->exp_conn_cnt = 0;
845 export->exp_lock_hash = NULL;
846 export->exp_flock_hash = NULL;
847 atomic_set(&export->exp_refcount, 2);
848 atomic_set(&export->exp_rpc_count, 0);
849 atomic_set(&export->exp_cb_count, 0);
850 atomic_set(&export->exp_locks_count, 0);
851 #if LUSTRE_TRACKS_LOCK_EXP_REFS
852 INIT_LIST_HEAD(&export->exp_locks_list);
853 spin_lock_init(&export->exp_locks_list_guard);
855 atomic_set(&export->exp_replay_count, 0);
856 export->exp_obd = obd;
857 INIT_LIST_HEAD(&export->exp_outstanding_replies);
858 spin_lock_init(&export->exp_uncommitted_replies_lock);
859 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
860 INIT_LIST_HEAD(&export->exp_req_replay_queue);
861 INIT_LIST_HEAD(&export->exp_handle.h_link);
862 INIT_LIST_HEAD(&export->exp_hp_rpcs);
863 INIT_LIST_HEAD(&export->exp_reg_rpcs);
864 class_handle_hash(&export->exp_handle, &export_handle_ops);
865 export->exp_last_request_time = cfs_time_current_sec();
866 spin_lock_init(&export->exp_lock);
867 spin_lock_init(&export->exp_rpc_lock);
868 INIT_HLIST_NODE(&export->exp_uuid_hash);
869 INIT_HLIST_NODE(&export->exp_nid_hash);
870 INIT_HLIST_NODE(&export->exp_gen_hash);
871 spin_lock_init(&export->exp_bl_list_lock);
872 INIT_LIST_HEAD(&export->exp_bl_list);
873 INIT_LIST_HEAD(&export->exp_stale_list);
875 export->exp_sp_peer = LUSTRE_SP_ANY;
876 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
877 export->exp_client_uuid = *cluuid;
878 obd_init_export(export);
880 spin_lock(&obd->obd_dev_lock);
881 /* shouldn't happen, but might race */
882 if (obd->obd_stopping)
883 GOTO(exit_unlock, rc = -ENODEV);
885 hash = cfs_hash_getref(obd->obd_uuid_hash);
887 GOTO(exit_unlock, rc = -ENODEV);
888 spin_unlock(&obd->obd_dev_lock);
890 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
891 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
893 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
894 obd->obd_name, cluuid->uuid, rc);
895 GOTO(exit_err, rc = -EALREADY);
899 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
900 spin_lock(&obd->obd_dev_lock);
901 if (obd->obd_stopping) {
902 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
903 GOTO(exit_unlock, rc = -ENODEV);
906 class_incref(obd, "export", export);
907 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
908 list_add_tail(&export->exp_obd_chain_timed,
909 &export->exp_obd->obd_exports_timed);
910 export->exp_obd->obd_num_exports++;
911 spin_unlock(&obd->obd_dev_lock);
912 cfs_hash_putref(hash);
916 spin_unlock(&obd->obd_dev_lock);
919 cfs_hash_putref(hash);
920 class_handle_unhash(&export->exp_handle);
921 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
922 obd_destroy_export(export);
923 OBD_FREE_PTR(export);
926 EXPORT_SYMBOL(class_new_export);
928 void class_unlink_export(struct obd_export *exp)
930 class_handle_unhash(&exp->exp_handle);
932 spin_lock(&exp->exp_obd->obd_dev_lock);
933 /* delete an uuid-export hashitem from hashtables */
934 if (!hlist_unhashed(&exp->exp_uuid_hash))
935 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
936 &exp->exp_client_uuid,
937 &exp->exp_uuid_hash);
939 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
940 list_del_init(&exp->exp_obd_chain_timed);
941 exp->exp_obd->obd_num_exports--;
942 spin_unlock(&exp->exp_obd->obd_dev_lock);
943 atomic_inc(&obd_stale_export_num);
945 /* A reference is kept by obd_stale_exports list */
946 obd_stale_export_put(exp);
949 /* Import management functions */
950 static void class_import_destroy(struct obd_import *imp)
954 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
955 imp->imp_obd->obd_name);
957 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
959 ptlrpc_put_connection_superhack(imp->imp_connection);
961 while (!list_empty(&imp->imp_conn_list)) {
962 struct obd_import_conn *imp_conn;
964 imp_conn = list_entry(imp->imp_conn_list.next,
965 struct obd_import_conn, oic_item);
966 list_del_init(&imp_conn->oic_item);
967 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
968 OBD_FREE(imp_conn, sizeof(*imp_conn));
971 LASSERT(imp->imp_sec == NULL);
972 class_decref(imp->imp_obd, "import", imp);
973 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
977 static void import_handle_addref(void *import)
979 class_import_get(import);
982 static struct portals_handle_ops import_handle_ops = {
983 .hop_addref = import_handle_addref,
987 struct obd_import *class_import_get(struct obd_import *import)
989 atomic_inc(&import->imp_refcount);
990 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
991 atomic_read(&import->imp_refcount),
992 import->imp_obd->obd_name);
995 EXPORT_SYMBOL(class_import_get);
997 void class_import_put(struct obd_import *imp)
1001 LASSERT(list_empty(&imp->imp_zombie_chain));
1002 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1004 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1005 atomic_read(&imp->imp_refcount) - 1,
1006 imp->imp_obd->obd_name);
1008 if (atomic_dec_and_test(&imp->imp_refcount)) {
1009 CDEBUG(D_INFO, "final put import %p\n", imp);
1010 obd_zombie_import_add(imp);
1013 /* catch possible import put race */
1014 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1017 EXPORT_SYMBOL(class_import_put);
1019 static void init_imp_at(struct imp_at *at) {
1021 at_init(&at->iat_net_latency, 0, 0);
1022 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1023 /* max service estimates are tracked on the server side, so
1024 don't use the AT history here, just use the last reported
1025 val. (But keep hist for proc histogram, worst_ever) */
1026 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1031 struct obd_import *class_new_import(struct obd_device *obd)
1033 struct obd_import *imp;
1035 OBD_ALLOC(imp, sizeof(*imp));
1039 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1040 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1041 INIT_LIST_HEAD(&imp->imp_replay_list);
1042 INIT_LIST_HEAD(&imp->imp_sending_list);
1043 INIT_LIST_HEAD(&imp->imp_delayed_list);
1044 INIT_LIST_HEAD(&imp->imp_committed_list);
1045 imp->imp_replay_cursor = &imp->imp_committed_list;
1046 spin_lock_init(&imp->imp_lock);
1047 imp->imp_last_success_conn = 0;
1048 imp->imp_state = LUSTRE_IMP_NEW;
1049 imp->imp_obd = class_incref(obd, "import", imp);
1050 mutex_init(&imp->imp_sec_mutex);
1051 init_waitqueue_head(&imp->imp_recovery_waitq);
1053 atomic_set(&imp->imp_refcount, 2);
1054 atomic_set(&imp->imp_unregistering, 0);
1055 atomic_set(&imp->imp_inflight, 0);
1056 atomic_set(&imp->imp_replay_inflight, 0);
1057 atomic_set(&imp->imp_inval_count, 0);
1058 INIT_LIST_HEAD(&imp->imp_conn_list);
1059 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1060 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1061 init_imp_at(&imp->imp_at);
1063 /* the default magic is V2, will be used in connect RPC, and
1064 * then adjusted according to the flags in request/reply. */
1065 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1069 EXPORT_SYMBOL(class_new_import);
1071 void class_destroy_import(struct obd_import *import)
1073 LASSERT(import != NULL);
1074 LASSERT(import != LP_POISON);
1076 class_handle_unhash(&import->imp_handle);
1078 spin_lock(&import->imp_lock);
1079 import->imp_generation++;
1080 spin_unlock(&import->imp_lock);
1081 class_import_put(import);
1083 EXPORT_SYMBOL(class_destroy_import);
1085 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1087 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1089 spin_lock(&exp->exp_locks_list_guard);
1091 LASSERT(lock->l_exp_refs_nr >= 0);
1093 if (lock->l_exp_refs_target != NULL &&
1094 lock->l_exp_refs_target != exp) {
1095 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1096 exp, lock, lock->l_exp_refs_target);
1098 if ((lock->l_exp_refs_nr ++) == 0) {
1099 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1100 lock->l_exp_refs_target = exp;
1102 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1103 lock, exp, lock->l_exp_refs_nr);
1104 spin_unlock(&exp->exp_locks_list_guard);
1107 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1109 spin_lock(&exp->exp_locks_list_guard);
1110 LASSERT(lock->l_exp_refs_nr > 0);
1111 if (lock->l_exp_refs_target != exp) {
1112 LCONSOLE_WARN("lock %p, "
1113 "mismatching export pointers: %p, %p\n",
1114 lock, lock->l_exp_refs_target, exp);
1116 if (-- lock->l_exp_refs_nr == 0) {
1117 list_del_init(&lock->l_exp_refs_link);
1118 lock->l_exp_refs_target = NULL;
1120 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1121 lock, exp, lock->l_exp_refs_nr);
1122 spin_unlock(&exp->exp_locks_list_guard);
1126 /* A connection defines an export context in which preallocation can
1127 be managed. This releases the export pointer reference, and returns
1128 the export handle, so the export refcount is 1 when this function
1130 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1131 struct obd_uuid *cluuid)
1133 struct obd_export *export;
1134 LASSERT(conn != NULL);
1135 LASSERT(obd != NULL);
1136 LASSERT(cluuid != NULL);
1139 export = class_new_export(obd, cluuid);
1141 RETURN(PTR_ERR(export));
1143 conn->cookie = export->exp_handle.h_cookie;
1144 class_export_put(export);
1146 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1147 cluuid->uuid, conn->cookie);
1150 EXPORT_SYMBOL(class_connect);
1152 /* if export is involved in recovery then clean up related things */
1153 static void class_export_recovery_cleanup(struct obd_export *exp)
1155 struct obd_device *obd = exp->exp_obd;
1157 spin_lock(&obd->obd_recovery_task_lock);
1158 if (obd->obd_recovering) {
1159 if (exp->exp_in_recovery) {
1160 spin_lock(&exp->exp_lock);
1161 exp->exp_in_recovery = 0;
1162 spin_unlock(&exp->exp_lock);
1163 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1164 atomic_dec(&obd->obd_connected_clients);
1167 /* if called during recovery then should update
1168 * obd_stale_clients counter,
1169 * lightweight exports are not counted */
1170 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1171 exp->exp_obd->obd_stale_clients++;
1173 spin_unlock(&obd->obd_recovery_task_lock);
1175 spin_lock(&exp->exp_lock);
1176 /** Cleanup req replay fields */
1177 if (exp->exp_req_replay_needed) {
1178 exp->exp_req_replay_needed = 0;
1180 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1181 atomic_dec(&obd->obd_req_replay_clients);
1184 /** Cleanup lock replay data */
1185 if (exp->exp_lock_replay_needed) {
1186 exp->exp_lock_replay_needed = 0;
1188 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1189 atomic_dec(&obd->obd_lock_replay_clients);
1191 spin_unlock(&exp->exp_lock);
1194 /* This function removes 1-3 references from the export:
1195 * 1 - for export pointer passed
1196 * and if disconnect really need
1197 * 2 - removing from hash
1198 * 3 - in client_unlink_export
1199 * The export pointer passed to this function can destroyed */
1200 int class_disconnect(struct obd_export *export)
1202 int already_disconnected;
1205 if (export == NULL) {
1206 CWARN("attempting to free NULL export %p\n", export);
1210 spin_lock(&export->exp_lock);
1211 already_disconnected = export->exp_disconnected;
1212 export->exp_disconnected = 1;
1213 spin_unlock(&export->exp_lock);
1215 /* class_cleanup(), abort_recovery(), and class_fail_export()
1216 * all end up in here, and if any of them race we shouldn't
1217 * call extra class_export_puts(). */
1218 if (already_disconnected) {
1219 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1220 GOTO(no_disconn, already_disconnected);
1223 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1224 export->exp_handle.h_cookie);
1226 if (!hlist_unhashed(&export->exp_nid_hash))
1227 cfs_hash_del(export->exp_obd->obd_nid_hash,
1228 &export->exp_connection->c_peer.nid,
1229 &export->exp_nid_hash);
1231 class_export_recovery_cleanup(export);
1232 class_unlink_export(export);
1234 class_export_put(export);
1237 EXPORT_SYMBOL(class_disconnect);
1239 /* Return non-zero for a fully connected export */
1240 int class_connected_export(struct obd_export *exp)
1245 spin_lock(&exp->exp_lock);
1246 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1247 spin_unlock(&exp->exp_lock);
1251 EXPORT_SYMBOL(class_connected_export);
1253 static void class_disconnect_export_list(struct list_head *list,
1254 enum obd_option flags)
1257 struct obd_export *exp;
1260 /* It's possible that an export may disconnect itself, but
1261 * nothing else will be added to this list. */
1262 while (!list_empty(list)) {
1263 exp = list_entry(list->next, struct obd_export,
1265 /* need for safe call CDEBUG after obd_disconnect */
1266 class_export_get(exp);
1268 spin_lock(&exp->exp_lock);
1269 exp->exp_flags = flags;
1270 spin_unlock(&exp->exp_lock);
1272 if (obd_uuid_equals(&exp->exp_client_uuid,
1273 &exp->exp_obd->obd_uuid)) {
1275 "exp %p export uuid == obd uuid, don't discon\n",
1277 /* Need to delete this now so we don't end up pointing
1278 * to work_list later when this export is cleaned up. */
1279 list_del_init(&exp->exp_obd_chain);
1280 class_export_put(exp);
1284 class_export_get(exp);
1285 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1286 "last request at "CFS_TIME_T"\n",
1287 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1288 exp, exp->exp_last_request_time);
1289 /* release one export reference anyway */
1290 rc = obd_disconnect(exp);
1292 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1293 obd_export_nid2str(exp), exp, rc);
1294 class_export_put(exp);
1299 void class_disconnect_exports(struct obd_device *obd)
1301 struct list_head work_list;
1304 /* Move all of the exports from obd_exports to a work list, en masse. */
1305 INIT_LIST_HEAD(&work_list);
1306 spin_lock(&obd->obd_dev_lock);
1307 list_splice_init(&obd->obd_exports, &work_list);
1308 list_splice_init(&obd->obd_delayed_exports, &work_list);
1309 spin_unlock(&obd->obd_dev_lock);
1311 if (!list_empty(&work_list)) {
1312 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1313 "disconnecting them\n", obd->obd_minor, obd);
1314 class_disconnect_export_list(&work_list,
1315 exp_flags_from_obd(obd));
1317 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1318 obd->obd_minor, obd);
1321 EXPORT_SYMBOL(class_disconnect_exports);
1323 /* Remove exports that have not completed recovery.
1325 void class_disconnect_stale_exports(struct obd_device *obd,
1326 int (*test_export)(struct obd_export *))
1328 struct list_head work_list;
1329 struct obd_export *exp, *n;
1333 INIT_LIST_HEAD(&work_list);
1334 spin_lock(&obd->obd_dev_lock);
1335 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1337 /* don't count self-export as client */
1338 if (obd_uuid_equals(&exp->exp_client_uuid,
1339 &exp->exp_obd->obd_uuid))
1342 /* don't evict clients which have no slot in last_rcvd
1343 * (e.g. lightweight connection) */
1344 if (exp->exp_target_data.ted_lr_idx == -1)
1347 spin_lock(&exp->exp_lock);
1348 if (exp->exp_failed || test_export(exp)) {
1349 spin_unlock(&exp->exp_lock);
1352 exp->exp_failed = 1;
1353 spin_unlock(&exp->exp_lock);
1355 list_move(&exp->exp_obd_chain, &work_list);
1357 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1358 obd->obd_name, exp->exp_client_uuid.uuid,
1359 exp->exp_connection == NULL ? "<unknown>" :
1360 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1361 print_export_data(exp, "EVICTING", 0);
1363 spin_unlock(&obd->obd_dev_lock);
1366 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1367 obd->obd_name, evicted);
1369 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1370 OBD_OPT_ABORT_RECOV);
1373 EXPORT_SYMBOL(class_disconnect_stale_exports);
1375 void class_fail_export(struct obd_export *exp)
1377 int rc, already_failed;
1379 spin_lock(&exp->exp_lock);
1380 already_failed = exp->exp_failed;
1381 exp->exp_failed = 1;
1382 spin_unlock(&exp->exp_lock);
1384 if (already_failed) {
1385 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1386 exp, exp->exp_client_uuid.uuid);
1390 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1391 exp, exp->exp_client_uuid.uuid);
1393 if (obd_dump_on_timeout)
1394 libcfs_debug_dumplog();
1396 /* need for safe call CDEBUG after obd_disconnect */
1397 class_export_get(exp);
1399 /* Most callers into obd_disconnect are removing their own reference
1400 * (request, for example) in addition to the one from the hash table.
1401 * We don't have such a reference here, so make one. */
1402 class_export_get(exp);
1403 rc = obd_disconnect(exp);
1405 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1407 CDEBUG(D_HA, "disconnected export %p/%s\n",
1408 exp, exp->exp_client_uuid.uuid);
1409 class_export_put(exp);
1411 EXPORT_SYMBOL(class_fail_export);
1413 char *obd_export_nid2str(struct obd_export *exp)
1415 if (exp->exp_connection != NULL)
1416 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1420 EXPORT_SYMBOL(obd_export_nid2str);
1422 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1424 struct cfs_hash *nid_hash;
1425 struct obd_export *doomed_exp = NULL;
1426 int exports_evicted = 0;
1428 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1430 spin_lock(&obd->obd_dev_lock);
1431 /* umount has run already, so evict thread should leave
1432 * its task to umount thread now */
1433 if (obd->obd_stopping) {
1434 spin_unlock(&obd->obd_dev_lock);
1435 return exports_evicted;
1437 nid_hash = obd->obd_nid_hash;
1438 cfs_hash_getref(nid_hash);
1439 spin_unlock(&obd->obd_dev_lock);
1442 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1443 if (doomed_exp == NULL)
1446 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1447 "nid %s found, wanted nid %s, requested nid %s\n",
1448 obd_export_nid2str(doomed_exp),
1449 libcfs_nid2str(nid_key), nid);
1450 LASSERTF(doomed_exp != obd->obd_self_export,
1451 "self-export is hashed by NID?\n");
1453 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1454 "request\n", obd->obd_name,
1455 obd_uuid2str(&doomed_exp->exp_client_uuid),
1456 obd_export_nid2str(doomed_exp));
1457 class_fail_export(doomed_exp);
1458 class_export_put(doomed_exp);
1461 cfs_hash_putref(nid_hash);
1463 if (!exports_evicted)
1464 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1465 obd->obd_name, nid);
1466 return exports_evicted;
1468 EXPORT_SYMBOL(obd_export_evict_by_nid);
1470 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1472 struct cfs_hash *uuid_hash;
1473 struct obd_export *doomed_exp = NULL;
1474 struct obd_uuid doomed_uuid;
1475 int exports_evicted = 0;
1477 spin_lock(&obd->obd_dev_lock);
1478 if (obd->obd_stopping) {
1479 spin_unlock(&obd->obd_dev_lock);
1480 return exports_evicted;
1482 uuid_hash = obd->obd_uuid_hash;
1483 cfs_hash_getref(uuid_hash);
1484 spin_unlock(&obd->obd_dev_lock);
1486 obd_str2uuid(&doomed_uuid, uuid);
1487 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1488 CERROR("%s: can't evict myself\n", obd->obd_name);
1489 cfs_hash_putref(uuid_hash);
1490 return exports_evicted;
1493 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1495 if (doomed_exp == NULL) {
1496 CERROR("%s: can't disconnect %s: no exports found\n",
1497 obd->obd_name, uuid);
1499 CWARN("%s: evicting %s at adminstrative request\n",
1500 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1501 class_fail_export(doomed_exp);
1502 class_export_put(doomed_exp);
1505 cfs_hash_putref(uuid_hash);
1507 return exports_evicted;
1510 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1511 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1514 static void print_export_data(struct obd_export *exp, const char *status,
1517 struct ptlrpc_reply_state *rs;
1518 struct ptlrpc_reply_state *first_reply = NULL;
1521 spin_lock(&exp->exp_lock);
1522 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1528 spin_unlock(&exp->exp_lock);
1530 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1531 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1532 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1533 atomic_read(&exp->exp_rpc_count),
1534 atomic_read(&exp->exp_cb_count),
1535 atomic_read(&exp->exp_locks_count),
1536 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1537 nreplies, first_reply, nreplies > 3 ? "..." : "",
1538 exp->exp_last_committed);
1539 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1540 if (locks && class_export_dump_hook != NULL)
1541 class_export_dump_hook(exp);
1545 void dump_exports(struct obd_device *obd, int locks)
1547 struct obd_export *exp;
1549 spin_lock(&obd->obd_dev_lock);
1550 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1551 print_export_data(exp, "ACTIVE", locks);
1552 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1553 print_export_data(exp, "UNLINKED", locks);
1554 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1555 print_export_data(exp, "DELAYED", locks);
1556 spin_unlock(&obd->obd_dev_lock);
1557 spin_lock(&obd_zombie_impexp_lock);
1558 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1559 print_export_data(exp, "ZOMBIE", locks);
1560 spin_unlock(&obd_zombie_impexp_lock);
1563 void obd_exports_barrier(struct obd_device *obd)
1566 LASSERT(list_empty(&obd->obd_exports));
1567 spin_lock(&obd->obd_dev_lock);
1568 while (!list_empty(&obd->obd_unlinked_exports)) {
1569 spin_unlock(&obd->obd_dev_lock);
1570 set_current_state(TASK_UNINTERRUPTIBLE);
1571 schedule_timeout(cfs_time_seconds(waited));
1572 if (waited > 5 && IS_PO2(waited)) {
1573 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1574 "more than %d seconds. "
1575 "The obd refcount = %d. Is it stuck?\n",
1576 obd->obd_name, waited,
1577 atomic_read(&obd->obd_refcount));
1578 dump_exports(obd, 1);
1581 spin_lock(&obd->obd_dev_lock);
1583 spin_unlock(&obd->obd_dev_lock);
1585 EXPORT_SYMBOL(obd_exports_barrier);
1587 /* Total amount of zombies to be destroyed */
1588 static int zombies_count = 0;
1591 * kill zombie imports and exports
1593 void obd_zombie_impexp_cull(void)
1595 struct obd_import *import;
1596 struct obd_export *export;
1600 spin_lock(&obd_zombie_impexp_lock);
1603 if (!list_empty(&obd_zombie_imports)) {
1604 import = list_entry(obd_zombie_imports.next,
1607 list_del_init(&import->imp_zombie_chain);
1611 if (!list_empty(&obd_zombie_exports)) {
1612 export = list_entry(obd_zombie_exports.next,
1615 list_del_init(&export->exp_obd_chain);
1618 spin_unlock(&obd_zombie_impexp_lock);
1620 if (import != NULL) {
1621 class_import_destroy(import);
1622 spin_lock(&obd_zombie_impexp_lock);
1624 spin_unlock(&obd_zombie_impexp_lock);
1627 if (export != NULL) {
1628 class_export_destroy(export);
1629 spin_lock(&obd_zombie_impexp_lock);
1631 spin_unlock(&obd_zombie_impexp_lock);
1635 } while (import != NULL || export != NULL);
1639 static struct completion obd_zombie_start;
1640 static struct completion obd_zombie_stop;
1641 static unsigned long obd_zombie_flags;
1642 static wait_queue_head_t obd_zombie_waitq;
1643 static pid_t obd_zombie_pid;
1646 OBD_ZOMBIE_STOP = 0x0001,
1650 * check for work for kill zombie import/export thread.
1652 static int obd_zombie_impexp_check(void *arg)
1656 spin_lock(&obd_zombie_impexp_lock);
1657 rc = (zombies_count == 0) &&
1658 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1659 spin_unlock(&obd_zombie_impexp_lock);
1665 * Add export to the obd_zombe thread and notify it.
1667 static void obd_zombie_export_add(struct obd_export *exp) {
1668 atomic_dec(&obd_stale_export_num);
1669 spin_lock(&exp->exp_obd->obd_dev_lock);
1670 LASSERT(!list_empty(&exp->exp_obd_chain));
1671 list_del_init(&exp->exp_obd_chain);
1672 spin_unlock(&exp->exp_obd->obd_dev_lock);
1673 spin_lock(&obd_zombie_impexp_lock);
1675 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1676 spin_unlock(&obd_zombie_impexp_lock);
1678 obd_zombie_impexp_notify();
1682 * Add import to the obd_zombe thread and notify it.
1684 static void obd_zombie_import_add(struct obd_import *imp) {
1685 LASSERT(imp->imp_sec == NULL);
1686 spin_lock(&obd_zombie_impexp_lock);
1687 LASSERT(list_empty(&imp->imp_zombie_chain));
1689 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1690 spin_unlock(&obd_zombie_impexp_lock);
1692 obd_zombie_impexp_notify();
1696 * notify import/export destroy thread about new zombie.
1698 static void obd_zombie_impexp_notify(void)
1701 * Make sure obd_zomebie_impexp_thread get this notification.
1702 * It is possible this signal only get by obd_zombie_barrier, and
1703 * barrier gulps this notification and sleeps away and hangs ensues
1705 wake_up_all(&obd_zombie_waitq);
1709 * check whether obd_zombie is idle
1711 static int obd_zombie_is_idle(void)
1715 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1716 spin_lock(&obd_zombie_impexp_lock);
1717 rc = (zombies_count == 0);
1718 spin_unlock(&obd_zombie_impexp_lock);
1723 * wait when obd_zombie import/export queues become empty
1725 void obd_zombie_barrier(void)
1727 struct l_wait_info lwi = { 0 };
1729 if (obd_zombie_pid == current_pid())
1730 /* don't wait for myself */
1732 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1734 EXPORT_SYMBOL(obd_zombie_barrier);
1737 struct obd_export *obd_stale_export_get(void)
1739 struct obd_export *exp = NULL;
1742 spin_lock(&obd_stale_export_lock);
1743 if (!list_empty(&obd_stale_exports)) {
1744 exp = list_entry(obd_stale_exports.next,
1745 struct obd_export, exp_stale_list);
1746 list_del_init(&exp->exp_stale_list);
1748 spin_unlock(&obd_stale_export_lock);
1751 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1752 atomic_read(&obd_stale_export_num));
1756 EXPORT_SYMBOL(obd_stale_export_get);
1758 void obd_stale_export_put(struct obd_export *exp)
1762 LASSERT(list_empty(&exp->exp_stale_list));
1763 if (exp->exp_lock_hash &&
1764 atomic_read(&exp->exp_lock_hash->hs_count)) {
1765 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1766 atomic_read(&obd_stale_export_num));
1768 spin_lock_bh(&exp->exp_bl_list_lock);
1769 spin_lock(&obd_stale_export_lock);
1770 /* Add to the tail if there is no blocked locks,
1771 * to the head otherwise. */
1772 if (list_empty(&exp->exp_bl_list))
1773 list_add_tail(&exp->exp_stale_list,
1774 &obd_stale_exports);
1776 list_add(&exp->exp_stale_list,
1777 &obd_stale_exports);
1779 spin_unlock(&obd_stale_export_lock);
1780 spin_unlock_bh(&exp->exp_bl_list_lock);
1782 class_export_put(exp);
1786 EXPORT_SYMBOL(obd_stale_export_put);
1789 * Adjust the position of the export in the stale list,
1790 * i.e. move to the head of the list if is needed.
1792 void obd_stale_export_adjust(struct obd_export *exp)
1794 LASSERT(exp != NULL);
1795 spin_lock_bh(&exp->exp_bl_list_lock);
1796 spin_lock(&obd_stale_export_lock);
1798 if (!list_empty(&exp->exp_stale_list) &&
1799 !list_empty(&exp->exp_bl_list))
1800 list_move(&exp->exp_stale_list, &obd_stale_exports);
1802 spin_unlock(&obd_stale_export_lock);
1803 spin_unlock_bh(&exp->exp_bl_list_lock);
1805 EXPORT_SYMBOL(obd_stale_export_adjust);
1808 * destroy zombie export/import thread.
1810 static int obd_zombie_impexp_thread(void *unused)
1812 unshare_fs_struct();
1813 complete(&obd_zombie_start);
1815 obd_zombie_pid = current_pid();
1817 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1818 struct l_wait_info lwi = { 0 };
1820 l_wait_event(obd_zombie_waitq,
1821 !obd_zombie_impexp_check(NULL), &lwi);
1822 obd_zombie_impexp_cull();
1825 * Notify obd_zombie_barrier callers that queues
1828 wake_up(&obd_zombie_waitq);
1831 complete(&obd_zombie_stop);
1838 * start destroy zombie import/export thread
1840 int obd_zombie_impexp_init(void)
1842 struct task_struct *task;
1844 INIT_LIST_HEAD(&obd_zombie_imports);
1846 INIT_LIST_HEAD(&obd_zombie_exports);
1847 spin_lock_init(&obd_zombie_impexp_lock);
1848 init_completion(&obd_zombie_start);
1849 init_completion(&obd_zombie_stop);
1850 init_waitqueue_head(&obd_zombie_waitq);
1853 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1855 RETURN(PTR_ERR(task));
1857 wait_for_completion(&obd_zombie_start);
1861 * stop destroy zombie import/export thread
1863 void obd_zombie_impexp_stop(void)
1865 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1866 obd_zombie_impexp_notify();
1867 wait_for_completion(&obd_zombie_stop);
1870 /***** Kernel-userspace comm helpers *******/
1872 /* Get length of entire message, including header */
1873 int kuc_len(int payload_len)
1875 return sizeof(struct kuc_hdr) + payload_len;
1877 EXPORT_SYMBOL(kuc_len);
1879 /* Get a pointer to kuc header, given a ptr to the payload
1880 * @param p Pointer to payload area
1881 * @returns Pointer to kuc header
1883 struct kuc_hdr * kuc_ptr(void *p)
1885 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1886 LASSERT(lh->kuc_magic == KUC_MAGIC);
1889 EXPORT_SYMBOL(kuc_ptr);
1891 /* Test if payload is part of kuc message
1892 * @param p Pointer to payload area
1895 int kuc_ispayload(void *p)
1897 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1899 if (kh->kuc_magic == KUC_MAGIC)
1904 EXPORT_SYMBOL(kuc_ispayload);
1906 /* Alloc space for a message, and fill in header
1907 * @return Pointer to payload area
1909 void *kuc_alloc(int payload_len, int transport, int type)
1912 int len = kuc_len(payload_len);
1916 return ERR_PTR(-ENOMEM);
1918 lh->kuc_magic = KUC_MAGIC;
1919 lh->kuc_transport = transport;
1920 lh->kuc_msgtype = type;
1921 lh->kuc_msglen = len;
1923 return (void *)(lh + 1);
1925 EXPORT_SYMBOL(kuc_alloc);
1927 /* Takes pointer to payload area */
1928 inline void kuc_free(void *p, int payload_len)
1930 struct kuc_hdr *lh = kuc_ptr(p);
1931 OBD_FREE(lh, kuc_len(payload_len));
1933 EXPORT_SYMBOL(kuc_free);
1935 struct obd_request_slot_waiter {
1936 struct list_head orsw_entry;
1937 wait_queue_head_t orsw_waitq;
1941 static bool obd_request_slot_avail(struct client_obd *cli,
1942 struct obd_request_slot_waiter *orsw)
1946 spin_lock(&cli->cl_loi_list_lock);
1947 avail = !!list_empty(&orsw->orsw_entry);
1948 spin_unlock(&cli->cl_loi_list_lock);
1954 * For network flow control, the RPC sponsor needs to acquire a credit
1955 * before sending the RPC. The credits count for a connection is defined
1956 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1957 * the subsequent RPC sponsors need to wait until others released their
1958 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1960 int obd_get_request_slot(struct client_obd *cli)
1962 struct obd_request_slot_waiter orsw;
1963 struct l_wait_info lwi;
1966 spin_lock(&cli->cl_loi_list_lock);
1967 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1968 cli->cl_r_in_flight++;
1969 spin_unlock(&cli->cl_loi_list_lock);
1973 init_waitqueue_head(&orsw.orsw_waitq);
1974 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1975 orsw.orsw_signaled = false;
1976 spin_unlock(&cli->cl_loi_list_lock);
1978 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1979 rc = l_wait_event(orsw.orsw_waitq,
1980 obd_request_slot_avail(cli, &orsw) ||
1984 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1985 * freed but other (such as obd_put_request_slot) is using it. */
1986 spin_lock(&cli->cl_loi_list_lock);
1988 if (!orsw.orsw_signaled) {
1989 if (list_empty(&orsw.orsw_entry))
1990 cli->cl_r_in_flight--;
1992 list_del(&orsw.orsw_entry);
1996 if (orsw.orsw_signaled) {
1997 LASSERT(list_empty(&orsw.orsw_entry));
2001 spin_unlock(&cli->cl_loi_list_lock);
2005 EXPORT_SYMBOL(obd_get_request_slot);
2007 void obd_put_request_slot(struct client_obd *cli)
2009 struct obd_request_slot_waiter *orsw;
2011 spin_lock(&cli->cl_loi_list_lock);
2012 cli->cl_r_in_flight--;
2014 /* If there is free slot, wakeup the first waiter. */
2015 if (!list_empty(&cli->cl_loi_read_list) &&
2016 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2017 orsw = list_entry(cli->cl_loi_read_list.next,
2018 struct obd_request_slot_waiter, orsw_entry);
2019 list_del_init(&orsw->orsw_entry);
2020 cli->cl_r_in_flight++;
2021 wake_up(&orsw->orsw_waitq);
2023 spin_unlock(&cli->cl_loi_list_lock);
2025 EXPORT_SYMBOL(obd_put_request_slot);
2027 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2029 return cli->cl_max_rpcs_in_flight;
2031 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2033 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2035 struct obd_request_slot_waiter *orsw;
2042 if (max > OBD_MAX_RIF_MAX || max < 1)
2045 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2046 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2047 /* adjust max_mod_rpcs_in_flight to ensure it is always
2048 * strictly lower that max_rpcs_in_flight */
2050 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2051 "because it must be higher than "
2052 "max_mod_rpcs_in_flight value",
2053 cli->cl_import->imp_obd->obd_name);
2056 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2057 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2063 spin_lock(&cli->cl_loi_list_lock);
2064 old = cli->cl_max_rpcs_in_flight;
2065 cli->cl_max_rpcs_in_flight = max;
2068 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2069 for (i = 0; i < diff; i++) {
2070 if (list_empty(&cli->cl_loi_read_list))
2073 orsw = list_entry(cli->cl_loi_read_list.next,
2074 struct obd_request_slot_waiter, orsw_entry);
2075 list_del_init(&orsw->orsw_entry);
2076 cli->cl_r_in_flight++;
2077 wake_up(&orsw->orsw_waitq);
2079 spin_unlock(&cli->cl_loi_list_lock);
2083 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2085 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2087 return cli->cl_max_mod_rpcs_in_flight;
2089 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2091 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2093 struct obd_connect_data *ocd;
2097 if (max > OBD_MAX_RIF_MAX || max < 1)
2100 /* cannot exceed or equal max_rpcs_in_flight */
2101 if (max >= cli->cl_max_rpcs_in_flight) {
2102 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2103 "higher or equal to max_rpcs_in_flight value (%u)\n",
2104 cli->cl_import->imp_obd->obd_name,
2105 max, cli->cl_max_rpcs_in_flight);
2109 /* cannot exceed max modify RPCs in flight supported by the server */
2110 ocd = &cli->cl_import->imp_connect_data;
2111 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2112 maxmodrpcs = ocd->ocd_maxmodrpcs;
2115 if (max > maxmodrpcs) {
2116 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2117 "higher than max_mod_rpcs_per_client value (%hu) "
2118 "returned by the server at connection\n",
2119 cli->cl_import->imp_obd->obd_name,
2124 spin_lock(&cli->cl_mod_rpcs_lock);
2126 prev = cli->cl_max_mod_rpcs_in_flight;
2127 cli->cl_max_mod_rpcs_in_flight = max;
2129 /* wakeup waiters if limit has been increased */
2130 if (cli->cl_max_mod_rpcs_in_flight > prev)
2131 wake_up(&cli->cl_mod_rpcs_waitq);
2133 spin_unlock(&cli->cl_mod_rpcs_lock);
2137 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2140 #define pct(a, b) (b ? a * 100 / b : 0)
2141 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2142 struct seq_file *seq)
2145 unsigned long mod_tot = 0, mod_cum;
2148 do_gettimeofday(&now);
2150 spin_lock(&cli->cl_mod_rpcs_lock);
2152 seq_printf(seq, "snapshot_time: %lu.%lu (secs.usecs)\n",
2153 now.tv_sec, now.tv_usec);
2154 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2155 cli->cl_mod_rpcs_in_flight);
2157 seq_printf(seq, "\n\t\t\tmodify\n");
2158 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2160 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2163 for (i = 0; i < OBD_HIST_MAX; i++) {
2164 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2166 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2167 i, mod, pct(mod, mod_tot),
2168 pct(mod_cum, mod_tot));
2169 if (mod_cum == mod_tot)
2173 spin_unlock(&cli->cl_mod_rpcs_lock);
2177 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2181 /* The number of modify RPCs sent in parallel is limited
2182 * because the server has a finite number of slots per client to
2183 * store request result and ensure reply reconstruction when needed.
2184 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2185 * that takes into account server limit and cl_max_rpcs_in_flight
2187 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2188 * one close request is allowed above the maximum.
2190 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2195 /* A slot is available if
2196 * - number of modify RPCs in flight is less than the max
2197 * - it's a close RPC and no other close request is in flight
2199 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2200 (close_req && cli->cl_close_rpcs_in_flight == 0);
2205 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2210 spin_lock(&cli->cl_mod_rpcs_lock);
2211 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2212 spin_unlock(&cli->cl_mod_rpcs_lock);
2216 /* Get a modify RPC slot from the obd client @cli according
2217 * to the kind of operation @opc that is going to be sent
2218 * and the intent @it of the operation if it applies.
2219 * If the maximum number of modify RPCs in flight is reached
2220 * the thread is put to sleep.
2221 * Returns the tag to be set in the request message. Tag 0
2222 * is reserved for non-modifying requests.
2224 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2225 struct lookup_intent *it)
2227 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2228 bool close_req = false;
2231 /* read-only metadata RPCs don't consume a slot on MDT
2232 * for reply reconstruction
2234 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2235 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2238 if (opc == MDS_CLOSE)
2242 spin_lock(&cli->cl_mod_rpcs_lock);
2243 max = cli->cl_max_mod_rpcs_in_flight;
2244 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2245 /* there is a slot available */
2246 cli->cl_mod_rpcs_in_flight++;
2248 cli->cl_close_rpcs_in_flight++;
2249 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2250 cli->cl_mod_rpcs_in_flight);
2251 /* find a free tag */
2252 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2254 LASSERT(i < OBD_MAX_RIF_MAX);
2255 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2256 spin_unlock(&cli->cl_mod_rpcs_lock);
2257 /* tag 0 is reserved for non-modify RPCs */
2260 spin_unlock(&cli->cl_mod_rpcs_lock);
2262 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2263 "opc %u, max %hu\n",
2264 cli->cl_import->imp_obd->obd_name, opc, max);
2266 l_wait_event(cli->cl_mod_rpcs_waitq,
2267 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2270 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2272 /* Put a modify RPC slot from the obd client @cli according
2273 * to the kind of operation @opc that has been sent and the
2274 * intent @it of the operation if it applies.
2276 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2277 struct lookup_intent *it, __u16 tag)
2279 bool close_req = false;
2281 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2282 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2285 if (opc == MDS_CLOSE)
2288 spin_lock(&cli->cl_mod_rpcs_lock);
2289 cli->cl_mod_rpcs_in_flight--;
2291 cli->cl_close_rpcs_in_flight--;
2292 /* release the tag in the bitmap */
2293 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2294 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2295 spin_unlock(&cli->cl_mod_rpcs_lock);
2296 wake_up(&cli->cl_mod_rpcs_waitq);
2298 EXPORT_SYMBOL(obd_put_mod_rpc_slot);