4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2015, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_disk.h>
48 #include <lustre_kernelcomm.h>
50 spinlock_t obd_types_lock;
52 static struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 static struct kmem_cache *import_cachep;
57 static struct list_head obd_zombie_imports;
58 static struct list_head obd_zombie_exports;
59 static spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 struct list_head obd_stale_exports;
68 spinlock_t obd_stale_export_lock;
69 atomic_t obd_stale_export_num;
71 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
72 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
73 void (*sptlrpc_sec_put_superhack)(struct obd_import *imp);
74 EXPORT_SYMBOL(sptlrpc_sec_put_superhack);
77 * support functions: we could use inter-module communication, but this
78 * is more portable to other OS's
80 static struct obd_device *obd_device_alloc(void)
82 struct obd_device *obd;
84 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
86 obd->obd_magic = OBD_DEVICE_MAGIC;
91 static void obd_device_free(struct obd_device *obd)
94 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
95 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
96 if (obd->obd_namespace != NULL) {
97 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
98 obd, obd->obd_namespace, obd->obd_force);
101 lu_ref_fini(&obd->obd_reference);
102 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
105 struct obd_type *class_search_type(const char *name)
107 struct list_head *tmp;
108 struct obd_type *type;
110 spin_lock(&obd_types_lock);
111 list_for_each(tmp, &obd_types) {
112 type = list_entry(tmp, struct obd_type, typ_chain);
113 if (strcmp(type->typ_name, name) == 0) {
114 spin_unlock(&obd_types_lock);
118 spin_unlock(&obd_types_lock);
121 EXPORT_SYMBOL(class_search_type);
123 struct obd_type *class_get_type(const char *name)
125 struct obd_type *type = class_search_type(name);
127 #ifdef HAVE_MODULE_LOADING_SUPPORT
129 const char *modname = name;
131 if (strcmp(modname, "obdfilter") == 0)
134 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
135 modname = LUSTRE_OSP_NAME;
137 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
138 modname = LUSTRE_MDT_NAME;
140 if (!request_module("%s", modname)) {
141 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
142 type = class_search_type(name);
144 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
150 spin_lock(&type->obd_type_lock);
152 try_module_get(type->typ_dt_ops->o_owner);
153 spin_unlock(&type->obd_type_lock);
158 void class_put_type(struct obd_type *type)
161 spin_lock(&type->obd_type_lock);
163 module_put(type->typ_dt_ops->o_owner);
164 spin_unlock(&type->obd_type_lock);
167 #define CLASS_MAX_NAME 1024
169 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
170 bool enable_proc, struct lprocfs_vars *vars,
171 const char *name, struct lu_device_type *ldt)
173 struct obd_type *type;
178 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
180 if (class_search_type(name)) {
181 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
186 OBD_ALLOC(type, sizeof(*type));
190 OBD_ALLOC_PTR(type->typ_dt_ops);
191 OBD_ALLOC_PTR(type->typ_md_ops);
192 OBD_ALLOC(type->typ_name, strlen(name) + 1);
194 if (type->typ_dt_ops == NULL ||
195 type->typ_md_ops == NULL ||
196 type->typ_name == NULL)
199 *(type->typ_dt_ops) = *dt_ops;
200 /* md_ops is optional */
202 *(type->typ_md_ops) = *md_ops;
203 strcpy(type->typ_name, name);
204 spin_lock_init(&type->obd_type_lock);
206 #ifdef CONFIG_PROC_FS
208 type->typ_procroot = lprocfs_register(type->typ_name,
211 if (IS_ERR(type->typ_procroot)) {
212 rc = PTR_ERR(type->typ_procroot);
213 type->typ_procroot = NULL;
220 rc = lu_device_type_init(ldt);
225 spin_lock(&obd_types_lock);
226 list_add(&type->typ_chain, &obd_types);
227 spin_unlock(&obd_types_lock);
232 if (type->typ_name != NULL) {
233 #ifdef CONFIG_PROC_FS
234 if (type->typ_procroot != NULL)
235 remove_proc_subtree(type->typ_name, proc_lustre_root);
237 OBD_FREE(type->typ_name, strlen(name) + 1);
239 if (type->typ_md_ops != NULL)
240 OBD_FREE_PTR(type->typ_md_ops);
241 if (type->typ_dt_ops != NULL)
242 OBD_FREE_PTR(type->typ_dt_ops);
243 OBD_FREE(type, sizeof(*type));
246 EXPORT_SYMBOL(class_register_type);
248 int class_unregister_type(const char *name)
250 struct obd_type *type = class_search_type(name);
254 CERROR("unknown obd type\n");
258 if (type->typ_refcnt) {
259 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
260 /* This is a bad situation, let's make the best of it */
261 /* Remove ops, but leave the name for debugging */
262 OBD_FREE_PTR(type->typ_dt_ops);
263 OBD_FREE_PTR(type->typ_md_ops);
267 /* we do not use type->typ_procroot as for compatibility purposes
268 * other modules can share names (i.e. lod can use lov entry). so
269 * we can't reference pointer as it can get invalided when another
270 * module removes the entry */
271 #ifdef CONFIG_PROC_FS
272 if (type->typ_procroot != NULL)
273 remove_proc_subtree(type->typ_name, proc_lustre_root);
274 if (type->typ_procsym != NULL)
275 lprocfs_remove(&type->typ_procsym);
278 lu_device_type_fini(type->typ_lu);
280 spin_lock(&obd_types_lock);
281 list_del(&type->typ_chain);
282 spin_unlock(&obd_types_lock);
283 OBD_FREE(type->typ_name, strlen(name) + 1);
284 if (type->typ_dt_ops != NULL)
285 OBD_FREE_PTR(type->typ_dt_ops);
286 if (type->typ_md_ops != NULL)
287 OBD_FREE_PTR(type->typ_md_ops);
288 OBD_FREE(type, sizeof(*type));
290 } /* class_unregister_type */
291 EXPORT_SYMBOL(class_unregister_type);
294 * Create a new obd device.
296 * Find an empty slot in ::obd_devs[], create a new obd device in it.
298 * \param[in] type_name obd device type string.
299 * \param[in] name obd device name.
301 * \retval NULL if create fails, otherwise return the obd device
304 struct obd_device *class_newdev(const char *type_name, const char *name)
306 struct obd_device *result = NULL;
307 struct obd_device *newdev;
308 struct obd_type *type = NULL;
310 int new_obd_minor = 0;
313 if (strlen(name) >= MAX_OBD_NAME) {
314 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
315 RETURN(ERR_PTR(-EINVAL));
318 type = class_get_type(type_name);
320 CERROR("OBD: unknown type: %s\n", type_name);
321 RETURN(ERR_PTR(-ENODEV));
324 newdev = obd_device_alloc();
326 GOTO(out_type, result = ERR_PTR(-ENOMEM));
328 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
330 write_lock(&obd_dev_lock);
331 for (i = 0; i < class_devno_max(); i++) {
332 struct obd_device *obd = class_num2obd(i);
334 if (obd && (strcmp(name, obd->obd_name) == 0)) {
335 CERROR("Device %s already exists at %d, won't add\n",
338 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
339 "%p obd_magic %08x != %08x\n", result,
340 result->obd_magic, OBD_DEVICE_MAGIC);
341 LASSERTF(result->obd_minor == new_obd_minor,
342 "%p obd_minor %d != %d\n", result,
343 result->obd_minor, new_obd_minor);
345 obd_devs[result->obd_minor] = NULL;
346 result->obd_name[0]='\0';
348 result = ERR_PTR(-EEXIST);
351 if (!result && !obd) {
353 result->obd_minor = i;
355 result->obd_type = type;
356 strncpy(result->obd_name, name,
357 sizeof(result->obd_name) - 1);
358 obd_devs[i] = result;
361 write_unlock(&obd_dev_lock);
363 if (result == NULL && i >= class_devno_max()) {
364 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
366 GOTO(out, result = ERR_PTR(-EOVERFLOW));
372 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
373 result->obd_name, result);
377 obd_device_free(newdev);
379 class_put_type(type);
383 void class_release_dev(struct obd_device *obd)
385 struct obd_type *obd_type = obd->obd_type;
387 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
388 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
389 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
390 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
391 LASSERT(obd_type != NULL);
393 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
394 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
396 write_lock(&obd_dev_lock);
397 obd_devs[obd->obd_minor] = NULL;
398 write_unlock(&obd_dev_lock);
399 obd_device_free(obd);
401 class_put_type(obd_type);
404 int class_name2dev(const char *name)
411 read_lock(&obd_dev_lock);
412 for (i = 0; i < class_devno_max(); i++) {
413 struct obd_device *obd = class_num2obd(i);
415 if (obd && strcmp(name, obd->obd_name) == 0) {
416 /* Make sure we finished attaching before we give
417 out any references */
418 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
419 if (obd->obd_attached) {
420 read_unlock(&obd_dev_lock);
426 read_unlock(&obd_dev_lock);
431 struct obd_device *class_name2obd(const char *name)
433 int dev = class_name2dev(name);
435 if (dev < 0 || dev > class_devno_max())
437 return class_num2obd(dev);
439 EXPORT_SYMBOL(class_name2obd);
441 int class_uuid2dev(struct obd_uuid *uuid)
445 read_lock(&obd_dev_lock);
446 for (i = 0; i < class_devno_max(); i++) {
447 struct obd_device *obd = class_num2obd(i);
449 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
450 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
451 read_unlock(&obd_dev_lock);
455 read_unlock(&obd_dev_lock);
460 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
462 int dev = class_uuid2dev(uuid);
465 return class_num2obd(dev);
467 EXPORT_SYMBOL(class_uuid2obd);
470 * Get obd device from ::obd_devs[]
472 * \param num [in] array index
474 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
475 * otherwise return the obd device there.
477 struct obd_device *class_num2obd(int num)
479 struct obd_device *obd = NULL;
481 if (num < class_devno_max()) {
486 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
487 "%p obd_magic %08x != %08x\n",
488 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
489 LASSERTF(obd->obd_minor == num,
490 "%p obd_minor %0d != %0d\n",
491 obd, obd->obd_minor, num);
498 * Get obd devices count. Device in any
500 * \retval obd device count
502 int get_devices_count(void)
504 int index, max_index = class_devno_max(), dev_count = 0;
506 read_lock(&obd_dev_lock);
507 for (index = 0; index <= max_index; index++) {
508 struct obd_device *obd = class_num2obd(index);
512 read_unlock(&obd_dev_lock);
516 EXPORT_SYMBOL(get_devices_count);
518 void class_obd_list(void)
523 read_lock(&obd_dev_lock);
524 for (i = 0; i < class_devno_max(); i++) {
525 struct obd_device *obd = class_num2obd(i);
529 if (obd->obd_stopping)
531 else if (obd->obd_set_up)
533 else if (obd->obd_attached)
537 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
538 i, status, obd->obd_type->typ_name,
539 obd->obd_name, obd->obd_uuid.uuid,
540 atomic_read(&obd->obd_refcount));
542 read_unlock(&obd_dev_lock);
546 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
547 specified, then only the client with that uuid is returned,
548 otherwise any client connected to the tgt is returned. */
549 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
550 const char * typ_name,
551 struct obd_uuid *grp_uuid)
555 read_lock(&obd_dev_lock);
556 for (i = 0; i < class_devno_max(); i++) {
557 struct obd_device *obd = class_num2obd(i);
561 if ((strncmp(obd->obd_type->typ_name, typ_name,
562 strlen(typ_name)) == 0)) {
563 if (obd_uuid_equals(tgt_uuid,
564 &obd->u.cli.cl_target_uuid) &&
565 ((grp_uuid)? obd_uuid_equals(grp_uuid,
566 &obd->obd_uuid) : 1)) {
567 read_unlock(&obd_dev_lock);
572 read_unlock(&obd_dev_lock);
576 EXPORT_SYMBOL(class_find_client_obd);
578 /* Iterate the obd_device list looking devices have grp_uuid. Start
579 searching at *next, and if a device is found, the next index to look
580 at is saved in *next. If next is NULL, then the first matching device
581 will always be returned. */
582 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
588 else if (*next >= 0 && *next < class_devno_max())
593 read_lock(&obd_dev_lock);
594 for (; i < class_devno_max(); i++) {
595 struct obd_device *obd = class_num2obd(i);
599 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
602 read_unlock(&obd_dev_lock);
606 read_unlock(&obd_dev_lock);
610 EXPORT_SYMBOL(class_devices_in_group);
613 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
614 * adjust sptlrpc settings accordingly.
616 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
618 struct obd_device *obd;
622 LASSERT(namelen > 0);
624 read_lock(&obd_dev_lock);
625 for (i = 0; i < class_devno_max(); i++) {
626 obd = class_num2obd(i);
628 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
631 /* only notify mdc, osc, osp, lwp, mdt, ost
632 * because only these have a -sptlrpc llog */
633 type = obd->obd_type->typ_name;
634 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
635 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
636 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
637 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
638 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
639 strcmp(type, LUSTRE_OST_NAME) != 0)
642 if (strncmp(obd->obd_name, fsname, namelen))
645 class_incref(obd, __FUNCTION__, obd);
646 read_unlock(&obd_dev_lock);
647 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
648 sizeof(KEY_SPTLRPC_CONF),
649 KEY_SPTLRPC_CONF, 0, NULL, NULL);
651 class_decref(obd, __FUNCTION__, obd);
652 read_lock(&obd_dev_lock);
654 read_unlock(&obd_dev_lock);
657 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
659 void obd_cleanup_caches(void)
662 if (obd_device_cachep) {
663 kmem_cache_destroy(obd_device_cachep);
664 obd_device_cachep = NULL;
667 kmem_cache_destroy(obdo_cachep);
671 kmem_cache_destroy(import_cachep);
672 import_cachep = NULL;
678 int obd_init_caches(void)
683 LASSERT(obd_device_cachep == NULL);
684 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
685 sizeof(struct obd_device),
687 if (!obd_device_cachep)
688 GOTO(out, rc = -ENOMEM);
690 LASSERT(obdo_cachep == NULL);
691 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
694 GOTO(out, rc = -ENOMEM);
696 LASSERT(import_cachep == NULL);
697 import_cachep = kmem_cache_create("ll_import_cache",
698 sizeof(struct obd_import),
701 GOTO(out, rc = -ENOMEM);
705 obd_cleanup_caches();
709 /* map connection to client */
710 struct obd_export *class_conn2export(struct lustre_handle *conn)
712 struct obd_export *export;
716 CDEBUG(D_CACHE, "looking for null handle\n");
720 if (conn->cookie == -1) { /* this means assign a new connection */
721 CDEBUG(D_CACHE, "want a new connection\n");
725 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
726 export = class_handle2object(conn->cookie, NULL);
729 EXPORT_SYMBOL(class_conn2export);
731 struct obd_device *class_exp2obd(struct obd_export *exp)
737 EXPORT_SYMBOL(class_exp2obd);
739 struct obd_device *class_conn2obd(struct lustre_handle *conn)
741 struct obd_export *export;
742 export = class_conn2export(conn);
744 struct obd_device *obd = export->exp_obd;
745 class_export_put(export);
751 struct obd_import *class_exp2cliimp(struct obd_export *exp)
753 struct obd_device *obd = exp->exp_obd;
756 return obd->u.cli.cl_import;
758 EXPORT_SYMBOL(class_exp2cliimp);
760 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
762 struct obd_device *obd = class_conn2obd(conn);
765 return obd->u.cli.cl_import;
768 /* Export management functions */
769 static void class_export_destroy(struct obd_export *exp)
771 struct obd_device *obd = exp->exp_obd;
774 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
775 LASSERT(obd != NULL);
777 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
778 exp->exp_client_uuid.uuid, obd->obd_name);
780 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
781 if (exp->exp_connection)
782 ptlrpc_put_connection_superhack(exp->exp_connection);
784 LASSERT(list_empty(&exp->exp_outstanding_replies));
785 LASSERT(list_empty(&exp->exp_uncommitted_replies));
786 LASSERT(list_empty(&exp->exp_req_replay_queue));
787 LASSERT(list_empty(&exp->exp_hp_rpcs));
788 obd_destroy_export(exp);
789 class_decref(obd, "export", exp);
791 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
795 static void export_handle_addref(void *export)
797 class_export_get(export);
800 static struct portals_handle_ops export_handle_ops = {
801 .hop_addref = export_handle_addref,
805 struct obd_export *class_export_get(struct obd_export *exp)
807 atomic_inc(&exp->exp_refcount);
808 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
809 atomic_read(&exp->exp_refcount));
812 EXPORT_SYMBOL(class_export_get);
814 void class_export_put(struct obd_export *exp)
816 LASSERT(exp != NULL);
817 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
818 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
819 atomic_read(&exp->exp_refcount) - 1);
821 if (atomic_dec_and_test(&exp->exp_refcount)) {
822 LASSERT(!list_empty(&exp->exp_obd_chain));
823 LASSERT(list_empty(&exp->exp_stale_list));
824 CDEBUG(D_IOCTL, "final put %p/%s\n",
825 exp, exp->exp_client_uuid.uuid);
827 /* release nid stat refererence */
828 lprocfs_exp_cleanup(exp);
830 obd_zombie_export_add(exp);
833 EXPORT_SYMBOL(class_export_put);
835 /* Creates a new export, adds it to the hash table, and returns a
836 * pointer to it. The refcount is 2: one for the hash reference, and
837 * one for the pointer returned by this function. */
838 struct obd_export *class_new_export(struct obd_device *obd,
839 struct obd_uuid *cluuid)
841 struct obd_export *export;
842 struct cfs_hash *hash = NULL;
846 OBD_ALLOC_PTR(export);
848 return ERR_PTR(-ENOMEM);
850 export->exp_conn_cnt = 0;
851 export->exp_lock_hash = NULL;
852 export->exp_flock_hash = NULL;
853 atomic_set(&export->exp_refcount, 2);
854 atomic_set(&export->exp_rpc_count, 0);
855 atomic_set(&export->exp_cb_count, 0);
856 atomic_set(&export->exp_locks_count, 0);
857 #if LUSTRE_TRACKS_LOCK_EXP_REFS
858 INIT_LIST_HEAD(&export->exp_locks_list);
859 spin_lock_init(&export->exp_locks_list_guard);
861 atomic_set(&export->exp_replay_count, 0);
862 export->exp_obd = obd;
863 INIT_LIST_HEAD(&export->exp_outstanding_replies);
864 spin_lock_init(&export->exp_uncommitted_replies_lock);
865 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
866 INIT_LIST_HEAD(&export->exp_req_replay_queue);
867 INIT_LIST_HEAD(&export->exp_handle.h_link);
868 INIT_LIST_HEAD(&export->exp_hp_rpcs);
869 INIT_LIST_HEAD(&export->exp_reg_rpcs);
870 class_handle_hash(&export->exp_handle, &export_handle_ops);
871 export->exp_last_request_time = cfs_time_current_sec();
872 spin_lock_init(&export->exp_lock);
873 spin_lock_init(&export->exp_rpc_lock);
874 INIT_HLIST_NODE(&export->exp_uuid_hash);
875 INIT_HLIST_NODE(&export->exp_nid_hash);
876 INIT_HLIST_NODE(&export->exp_gen_hash);
877 spin_lock_init(&export->exp_bl_list_lock);
878 INIT_LIST_HEAD(&export->exp_bl_list);
879 INIT_LIST_HEAD(&export->exp_stale_list);
881 export->exp_sp_peer = LUSTRE_SP_ANY;
882 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
883 export->exp_client_uuid = *cluuid;
884 obd_init_export(export);
886 spin_lock(&obd->obd_dev_lock);
887 /* shouldn't happen, but might race */
888 if (obd->obd_stopping)
889 GOTO(exit_unlock, rc = -ENODEV);
891 hash = cfs_hash_getref(obd->obd_uuid_hash);
893 GOTO(exit_unlock, rc = -ENODEV);
894 spin_unlock(&obd->obd_dev_lock);
896 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
897 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
899 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
900 obd->obd_name, cluuid->uuid, rc);
901 GOTO(exit_err, rc = -EALREADY);
905 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
906 spin_lock(&obd->obd_dev_lock);
907 if (obd->obd_stopping) {
908 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
909 GOTO(exit_unlock, rc = -ENODEV);
912 class_incref(obd, "export", export);
913 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
914 list_add_tail(&export->exp_obd_chain_timed,
915 &export->exp_obd->obd_exports_timed);
916 export->exp_obd->obd_num_exports++;
917 spin_unlock(&obd->obd_dev_lock);
918 cfs_hash_putref(hash);
922 spin_unlock(&obd->obd_dev_lock);
925 cfs_hash_putref(hash);
926 class_handle_unhash(&export->exp_handle);
927 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
928 obd_destroy_export(export);
929 OBD_FREE_PTR(export);
932 EXPORT_SYMBOL(class_new_export);
934 void class_unlink_export(struct obd_export *exp)
936 class_handle_unhash(&exp->exp_handle);
938 spin_lock(&exp->exp_obd->obd_dev_lock);
939 /* delete an uuid-export hashitem from hashtables */
940 if (!hlist_unhashed(&exp->exp_uuid_hash))
941 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
942 &exp->exp_client_uuid,
943 &exp->exp_uuid_hash);
945 if (!hlist_unhashed(&exp->exp_gen_hash)) {
946 struct tg_export_data *ted = &exp->exp_target_data;
947 struct cfs_hash *hash;
949 /* Because obd_gen_hash will not be released until
950 * class_cleanup(), so hash should never be NULL here */
951 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
952 LASSERT(hash != NULL);
953 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
955 cfs_hash_putref(hash);
958 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
959 list_del_init(&exp->exp_obd_chain_timed);
960 exp->exp_obd->obd_num_exports--;
961 spin_unlock(&exp->exp_obd->obd_dev_lock);
962 atomic_inc(&obd_stale_export_num);
964 /* A reference is kept by obd_stale_exports list */
965 obd_stale_export_put(exp);
967 EXPORT_SYMBOL(class_unlink_export);
969 /* Import management functions */
970 static void class_import_destroy(struct obd_import *imp)
974 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
975 imp->imp_obd->obd_name);
977 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
979 ptlrpc_put_connection_superhack(imp->imp_connection);
981 while (!list_empty(&imp->imp_conn_list)) {
982 struct obd_import_conn *imp_conn;
984 imp_conn = list_entry(imp->imp_conn_list.next,
985 struct obd_import_conn, oic_item);
986 list_del_init(&imp_conn->oic_item);
987 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
988 OBD_FREE(imp_conn, sizeof(*imp_conn));
991 LASSERT(imp->imp_sec == NULL);
992 class_decref(imp->imp_obd, "import", imp);
993 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
997 static void import_handle_addref(void *import)
999 class_import_get(import);
1002 static struct portals_handle_ops import_handle_ops = {
1003 .hop_addref = import_handle_addref,
1007 struct obd_import *class_import_get(struct obd_import *import)
1009 atomic_inc(&import->imp_refcount);
1010 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1011 atomic_read(&import->imp_refcount),
1012 import->imp_obd->obd_name);
1015 EXPORT_SYMBOL(class_import_get);
1017 void class_import_put(struct obd_import *imp)
1021 LASSERT(list_empty(&imp->imp_zombie_chain));
1022 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1024 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1025 atomic_read(&imp->imp_refcount) - 1,
1026 imp->imp_obd->obd_name);
1028 if (atomic_dec_and_test(&imp->imp_refcount)) {
1029 CDEBUG(D_INFO, "final put import %p\n", imp);
1030 /* Drop security policy instance after all RPCs have
1031 * finished/aborted to let all busy contexts be released. */
1032 sptlrpc_sec_put_superhack(imp);
1034 obd_zombie_import_add(imp);
1037 /* catch possible import put race */
1038 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1041 EXPORT_SYMBOL(class_import_put);
1043 static void init_imp_at(struct imp_at *at) {
1045 at_init(&at->iat_net_latency, 0, 0);
1046 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1047 /* max service estimates are tracked on the server side, so
1048 don't use the AT history here, just use the last reported
1049 val. (But keep hist for proc histogram, worst_ever) */
1050 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1055 struct obd_import *class_new_import(struct obd_device *obd)
1057 struct obd_import *imp;
1059 OBD_ALLOC(imp, sizeof(*imp));
1063 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1064 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1065 INIT_LIST_HEAD(&imp->imp_replay_list);
1066 INIT_LIST_HEAD(&imp->imp_sending_list);
1067 INIT_LIST_HEAD(&imp->imp_delayed_list);
1068 INIT_LIST_HEAD(&imp->imp_committed_list);
1069 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1070 imp->imp_known_replied_xid = 0;
1071 imp->imp_replay_cursor = &imp->imp_committed_list;
1072 spin_lock_init(&imp->imp_lock);
1073 imp->imp_last_success_conn = 0;
1074 imp->imp_state = LUSTRE_IMP_NEW;
1075 imp->imp_obd = class_incref(obd, "import", imp);
1076 mutex_init(&imp->imp_sec_mutex);
1077 init_waitqueue_head(&imp->imp_recovery_waitq);
1079 atomic_set(&imp->imp_refcount, 2);
1080 atomic_set(&imp->imp_unregistering, 0);
1081 atomic_set(&imp->imp_inflight, 0);
1082 atomic_set(&imp->imp_replay_inflight, 0);
1083 atomic_set(&imp->imp_inval_count, 0);
1084 INIT_LIST_HEAD(&imp->imp_conn_list);
1085 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1086 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1087 init_imp_at(&imp->imp_at);
1089 /* the default magic is V2, will be used in connect RPC, and
1090 * then adjusted according to the flags in request/reply. */
1091 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1095 EXPORT_SYMBOL(class_new_import);
1097 void class_destroy_import(struct obd_import *import)
1099 LASSERT(import != NULL);
1100 LASSERT(import != LP_POISON);
1102 class_handle_unhash(&import->imp_handle);
1104 spin_lock(&import->imp_lock);
1105 import->imp_generation++;
1106 spin_unlock(&import->imp_lock);
1107 class_import_put(import);
1109 EXPORT_SYMBOL(class_destroy_import);
1111 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1113 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1115 spin_lock(&exp->exp_locks_list_guard);
1117 LASSERT(lock->l_exp_refs_nr >= 0);
1119 if (lock->l_exp_refs_target != NULL &&
1120 lock->l_exp_refs_target != exp) {
1121 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1122 exp, lock, lock->l_exp_refs_target);
1124 if ((lock->l_exp_refs_nr ++) == 0) {
1125 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1126 lock->l_exp_refs_target = exp;
1128 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1129 lock, exp, lock->l_exp_refs_nr);
1130 spin_unlock(&exp->exp_locks_list_guard);
1133 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1135 spin_lock(&exp->exp_locks_list_guard);
1136 LASSERT(lock->l_exp_refs_nr > 0);
1137 if (lock->l_exp_refs_target != exp) {
1138 LCONSOLE_WARN("lock %p, "
1139 "mismatching export pointers: %p, %p\n",
1140 lock, lock->l_exp_refs_target, exp);
1142 if (-- lock->l_exp_refs_nr == 0) {
1143 list_del_init(&lock->l_exp_refs_link);
1144 lock->l_exp_refs_target = NULL;
1146 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1147 lock, exp, lock->l_exp_refs_nr);
1148 spin_unlock(&exp->exp_locks_list_guard);
1152 /* A connection defines an export context in which preallocation can
1153 be managed. This releases the export pointer reference, and returns
1154 the export handle, so the export refcount is 1 when this function
1156 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1157 struct obd_uuid *cluuid)
1159 struct obd_export *export;
1160 LASSERT(conn != NULL);
1161 LASSERT(obd != NULL);
1162 LASSERT(cluuid != NULL);
1165 export = class_new_export(obd, cluuid);
1167 RETURN(PTR_ERR(export));
1169 conn->cookie = export->exp_handle.h_cookie;
1170 class_export_put(export);
1172 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1173 cluuid->uuid, conn->cookie);
1176 EXPORT_SYMBOL(class_connect);
1178 /* if export is involved in recovery then clean up related things */
1179 static void class_export_recovery_cleanup(struct obd_export *exp)
1181 struct obd_device *obd = exp->exp_obd;
1183 spin_lock(&obd->obd_recovery_task_lock);
1184 if (obd->obd_recovering) {
1185 if (exp->exp_in_recovery) {
1186 spin_lock(&exp->exp_lock);
1187 exp->exp_in_recovery = 0;
1188 spin_unlock(&exp->exp_lock);
1189 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1190 atomic_dec(&obd->obd_connected_clients);
1193 /* if called during recovery then should update
1194 * obd_stale_clients counter,
1195 * lightweight exports are not counted */
1196 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1197 exp->exp_obd->obd_stale_clients++;
1199 spin_unlock(&obd->obd_recovery_task_lock);
1201 spin_lock(&exp->exp_lock);
1202 /** Cleanup req replay fields */
1203 if (exp->exp_req_replay_needed) {
1204 exp->exp_req_replay_needed = 0;
1206 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1207 atomic_dec(&obd->obd_req_replay_clients);
1210 /** Cleanup lock replay data */
1211 if (exp->exp_lock_replay_needed) {
1212 exp->exp_lock_replay_needed = 0;
1214 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1215 atomic_dec(&obd->obd_lock_replay_clients);
1217 spin_unlock(&exp->exp_lock);
1220 /* This function removes 1-3 references from the export:
1221 * 1 - for export pointer passed
1222 * and if disconnect really need
1223 * 2 - removing from hash
1224 * 3 - in client_unlink_export
1225 * The export pointer passed to this function can destroyed */
1226 int class_disconnect(struct obd_export *export)
1228 int already_disconnected;
1231 if (export == NULL) {
1232 CWARN("attempting to free NULL export %p\n", export);
1236 spin_lock(&export->exp_lock);
1237 already_disconnected = export->exp_disconnected;
1238 export->exp_disconnected = 1;
1239 spin_unlock(&export->exp_lock);
1241 /* class_cleanup(), abort_recovery(), and class_fail_export()
1242 * all end up in here, and if any of them race we shouldn't
1243 * call extra class_export_puts(). */
1244 if (already_disconnected) {
1245 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1246 GOTO(no_disconn, already_disconnected);
1249 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1250 export->exp_handle.h_cookie);
1252 if (!hlist_unhashed(&export->exp_nid_hash))
1253 cfs_hash_del(export->exp_obd->obd_nid_hash,
1254 &export->exp_connection->c_peer.nid,
1255 &export->exp_nid_hash);
1257 class_export_recovery_cleanup(export);
1258 class_unlink_export(export);
1260 class_export_put(export);
1263 EXPORT_SYMBOL(class_disconnect);
1265 /* Return non-zero for a fully connected export */
1266 int class_connected_export(struct obd_export *exp)
1271 spin_lock(&exp->exp_lock);
1272 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1273 spin_unlock(&exp->exp_lock);
1277 EXPORT_SYMBOL(class_connected_export);
1279 static void class_disconnect_export_list(struct list_head *list,
1280 enum obd_option flags)
1283 struct obd_export *exp;
1286 /* It's possible that an export may disconnect itself, but
1287 * nothing else will be added to this list. */
1288 while (!list_empty(list)) {
1289 exp = list_entry(list->next, struct obd_export,
1291 /* need for safe call CDEBUG after obd_disconnect */
1292 class_export_get(exp);
1294 spin_lock(&exp->exp_lock);
1295 exp->exp_flags = flags;
1296 spin_unlock(&exp->exp_lock);
1298 if (obd_uuid_equals(&exp->exp_client_uuid,
1299 &exp->exp_obd->obd_uuid)) {
1301 "exp %p export uuid == obd uuid, don't discon\n",
1303 /* Need to delete this now so we don't end up pointing
1304 * to work_list later when this export is cleaned up. */
1305 list_del_init(&exp->exp_obd_chain);
1306 class_export_put(exp);
1310 class_export_get(exp);
1311 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1312 "last request at "CFS_TIME_T"\n",
1313 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1314 exp, exp->exp_last_request_time);
1315 /* release one export reference anyway */
1316 rc = obd_disconnect(exp);
1318 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1319 obd_export_nid2str(exp), exp, rc);
1320 class_export_put(exp);
1325 void class_disconnect_exports(struct obd_device *obd)
1327 struct list_head work_list;
1330 /* Move all of the exports from obd_exports to a work list, en masse. */
1331 INIT_LIST_HEAD(&work_list);
1332 spin_lock(&obd->obd_dev_lock);
1333 list_splice_init(&obd->obd_exports, &work_list);
1334 list_splice_init(&obd->obd_delayed_exports, &work_list);
1335 spin_unlock(&obd->obd_dev_lock);
1337 if (!list_empty(&work_list)) {
1338 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1339 "disconnecting them\n", obd->obd_minor, obd);
1340 class_disconnect_export_list(&work_list,
1341 exp_flags_from_obd(obd));
1343 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1344 obd->obd_minor, obd);
1347 EXPORT_SYMBOL(class_disconnect_exports);
1349 /* Remove exports that have not completed recovery.
1351 void class_disconnect_stale_exports(struct obd_device *obd,
1352 int (*test_export)(struct obd_export *))
1354 struct list_head work_list;
1355 struct obd_export *exp, *n;
1359 INIT_LIST_HEAD(&work_list);
1360 spin_lock(&obd->obd_dev_lock);
1361 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1363 /* don't count self-export as client */
1364 if (obd_uuid_equals(&exp->exp_client_uuid,
1365 &exp->exp_obd->obd_uuid))
1368 /* don't evict clients which have no slot in last_rcvd
1369 * (e.g. lightweight connection) */
1370 if (exp->exp_target_data.ted_lr_idx == -1)
1373 spin_lock(&exp->exp_lock);
1374 if (exp->exp_failed || test_export(exp)) {
1375 spin_unlock(&exp->exp_lock);
1378 exp->exp_failed = 1;
1379 spin_unlock(&exp->exp_lock);
1381 list_move(&exp->exp_obd_chain, &work_list);
1383 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1384 obd->obd_name, exp->exp_client_uuid.uuid,
1385 exp->exp_connection == NULL ? "<unknown>" :
1386 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1387 print_export_data(exp, "EVICTING", 0);
1389 spin_unlock(&obd->obd_dev_lock);
1392 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1393 obd->obd_name, evicted);
1395 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1396 OBD_OPT_ABORT_RECOV);
1399 EXPORT_SYMBOL(class_disconnect_stale_exports);
1401 void class_fail_export(struct obd_export *exp)
1403 int rc, already_failed;
1405 spin_lock(&exp->exp_lock);
1406 already_failed = exp->exp_failed;
1407 exp->exp_failed = 1;
1408 spin_unlock(&exp->exp_lock);
1410 if (already_failed) {
1411 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1412 exp, exp->exp_client_uuid.uuid);
1416 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1417 exp, exp->exp_client_uuid.uuid);
1419 if (obd_dump_on_timeout)
1420 libcfs_debug_dumplog();
1422 /* need for safe call CDEBUG after obd_disconnect */
1423 class_export_get(exp);
1425 /* Most callers into obd_disconnect are removing their own reference
1426 * (request, for example) in addition to the one from the hash table.
1427 * We don't have such a reference here, so make one. */
1428 class_export_get(exp);
1429 rc = obd_disconnect(exp);
1431 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1433 CDEBUG(D_HA, "disconnected export %p/%s\n",
1434 exp, exp->exp_client_uuid.uuid);
1435 class_export_put(exp);
1437 EXPORT_SYMBOL(class_fail_export);
1439 char *obd_export_nid2str(struct obd_export *exp)
1441 if (exp->exp_connection != NULL)
1442 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1446 EXPORT_SYMBOL(obd_export_nid2str);
1448 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1450 struct cfs_hash *nid_hash;
1451 struct obd_export *doomed_exp = NULL;
1452 int exports_evicted = 0;
1454 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1456 spin_lock(&obd->obd_dev_lock);
1457 /* umount has run already, so evict thread should leave
1458 * its task to umount thread now */
1459 if (obd->obd_stopping) {
1460 spin_unlock(&obd->obd_dev_lock);
1461 return exports_evicted;
1463 nid_hash = obd->obd_nid_hash;
1464 cfs_hash_getref(nid_hash);
1465 spin_unlock(&obd->obd_dev_lock);
1468 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1469 if (doomed_exp == NULL)
1472 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1473 "nid %s found, wanted nid %s, requested nid %s\n",
1474 obd_export_nid2str(doomed_exp),
1475 libcfs_nid2str(nid_key), nid);
1476 LASSERTF(doomed_exp != obd->obd_self_export,
1477 "self-export is hashed by NID?\n");
1479 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1480 "request\n", obd->obd_name,
1481 obd_uuid2str(&doomed_exp->exp_client_uuid),
1482 obd_export_nid2str(doomed_exp));
1483 class_fail_export(doomed_exp);
1484 class_export_put(doomed_exp);
1487 cfs_hash_putref(nid_hash);
1489 if (!exports_evicted)
1490 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1491 obd->obd_name, nid);
1492 return exports_evicted;
1494 EXPORT_SYMBOL(obd_export_evict_by_nid);
1496 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1498 struct cfs_hash *uuid_hash;
1499 struct obd_export *doomed_exp = NULL;
1500 struct obd_uuid doomed_uuid;
1501 int exports_evicted = 0;
1503 spin_lock(&obd->obd_dev_lock);
1504 if (obd->obd_stopping) {
1505 spin_unlock(&obd->obd_dev_lock);
1506 return exports_evicted;
1508 uuid_hash = obd->obd_uuid_hash;
1509 cfs_hash_getref(uuid_hash);
1510 spin_unlock(&obd->obd_dev_lock);
1512 obd_str2uuid(&doomed_uuid, uuid);
1513 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1514 CERROR("%s: can't evict myself\n", obd->obd_name);
1515 cfs_hash_putref(uuid_hash);
1516 return exports_evicted;
1519 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1521 if (doomed_exp == NULL) {
1522 CERROR("%s: can't disconnect %s: no exports found\n",
1523 obd->obd_name, uuid);
1525 CWARN("%s: evicting %s at adminstrative request\n",
1526 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1527 class_fail_export(doomed_exp);
1528 class_export_put(doomed_exp);
1531 cfs_hash_putref(uuid_hash);
1533 return exports_evicted;
1536 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1537 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1540 static void print_export_data(struct obd_export *exp, const char *status,
1543 struct ptlrpc_reply_state *rs;
1544 struct ptlrpc_reply_state *first_reply = NULL;
1547 spin_lock(&exp->exp_lock);
1548 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1554 spin_unlock(&exp->exp_lock);
1556 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1557 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1558 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1559 atomic_read(&exp->exp_rpc_count),
1560 atomic_read(&exp->exp_cb_count),
1561 atomic_read(&exp->exp_locks_count),
1562 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1563 nreplies, first_reply, nreplies > 3 ? "..." : "",
1564 exp->exp_last_committed);
1565 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1566 if (locks && class_export_dump_hook != NULL)
1567 class_export_dump_hook(exp);
1571 void dump_exports(struct obd_device *obd, int locks)
1573 struct obd_export *exp;
1575 spin_lock(&obd->obd_dev_lock);
1576 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1577 print_export_data(exp, "ACTIVE", locks);
1578 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1579 print_export_data(exp, "UNLINKED", locks);
1580 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1581 print_export_data(exp, "DELAYED", locks);
1582 spin_unlock(&obd->obd_dev_lock);
1583 spin_lock(&obd_zombie_impexp_lock);
1584 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1585 print_export_data(exp, "ZOMBIE", locks);
1586 spin_unlock(&obd_zombie_impexp_lock);
1589 void obd_exports_barrier(struct obd_device *obd)
1592 LASSERT(list_empty(&obd->obd_exports));
1593 spin_lock(&obd->obd_dev_lock);
1594 while (!list_empty(&obd->obd_unlinked_exports)) {
1595 spin_unlock(&obd->obd_dev_lock);
1596 set_current_state(TASK_UNINTERRUPTIBLE);
1597 schedule_timeout(cfs_time_seconds(waited));
1598 if (waited > 5 && IS_PO2(waited)) {
1599 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1600 "more than %d seconds. "
1601 "The obd refcount = %d. Is it stuck?\n",
1602 obd->obd_name, waited,
1603 atomic_read(&obd->obd_refcount));
1604 dump_exports(obd, 1);
1607 spin_lock(&obd->obd_dev_lock);
1609 spin_unlock(&obd->obd_dev_lock);
1611 EXPORT_SYMBOL(obd_exports_barrier);
1613 /* Total amount of zombies to be destroyed */
1614 static int zombies_count = 0;
1617 * kill zombie imports and exports
1619 void obd_zombie_impexp_cull(void)
1621 struct obd_import *import;
1622 struct obd_export *export;
1626 spin_lock(&obd_zombie_impexp_lock);
1629 if (!list_empty(&obd_zombie_imports)) {
1630 import = list_entry(obd_zombie_imports.next,
1633 list_del_init(&import->imp_zombie_chain);
1637 if (!list_empty(&obd_zombie_exports)) {
1638 export = list_entry(obd_zombie_exports.next,
1641 list_del_init(&export->exp_obd_chain);
1644 spin_unlock(&obd_zombie_impexp_lock);
1646 if (import != NULL) {
1647 class_import_destroy(import);
1648 spin_lock(&obd_zombie_impexp_lock);
1650 spin_unlock(&obd_zombie_impexp_lock);
1653 if (export != NULL) {
1654 class_export_destroy(export);
1655 spin_lock(&obd_zombie_impexp_lock);
1657 spin_unlock(&obd_zombie_impexp_lock);
1661 } while (import != NULL || export != NULL);
1665 static struct completion obd_zombie_start;
1666 static struct completion obd_zombie_stop;
1667 static unsigned long obd_zombie_flags;
1668 static wait_queue_head_t obd_zombie_waitq;
1669 static pid_t obd_zombie_pid;
1672 OBD_ZOMBIE_STOP = 0x0001,
1676 * check for work for kill zombie import/export thread.
1678 static int obd_zombie_impexp_check(void *arg)
1682 spin_lock(&obd_zombie_impexp_lock);
1683 rc = (zombies_count == 0) &&
1684 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1685 spin_unlock(&obd_zombie_impexp_lock);
1691 * Add export to the obd_zombe thread and notify it.
1693 static void obd_zombie_export_add(struct obd_export *exp) {
1694 atomic_dec(&obd_stale_export_num);
1695 spin_lock(&exp->exp_obd->obd_dev_lock);
1696 LASSERT(!list_empty(&exp->exp_obd_chain));
1697 list_del_init(&exp->exp_obd_chain);
1698 spin_unlock(&exp->exp_obd->obd_dev_lock);
1699 spin_lock(&obd_zombie_impexp_lock);
1701 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1702 spin_unlock(&obd_zombie_impexp_lock);
1704 obd_zombie_impexp_notify();
1708 * Add import to the obd_zombe thread and notify it.
1710 static void obd_zombie_import_add(struct obd_import *imp) {
1711 LASSERT(imp->imp_sec == NULL);
1712 spin_lock(&obd_zombie_impexp_lock);
1713 LASSERT(list_empty(&imp->imp_zombie_chain));
1715 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1716 spin_unlock(&obd_zombie_impexp_lock);
1718 obd_zombie_impexp_notify();
1722 * notify import/export destroy thread about new zombie.
1724 static void obd_zombie_impexp_notify(void)
1727 * Make sure obd_zomebie_impexp_thread get this notification.
1728 * It is possible this signal only get by obd_zombie_barrier, and
1729 * barrier gulps this notification and sleeps away and hangs ensues
1731 wake_up_all(&obd_zombie_waitq);
1735 * check whether obd_zombie is idle
1737 static int obd_zombie_is_idle(void)
1741 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1742 spin_lock(&obd_zombie_impexp_lock);
1743 rc = (zombies_count == 0);
1744 spin_unlock(&obd_zombie_impexp_lock);
1749 * wait when obd_zombie import/export queues become empty
1751 void obd_zombie_barrier(void)
1753 struct l_wait_info lwi = { 0 };
1755 if (obd_zombie_pid == current_pid())
1756 /* don't wait for myself */
1758 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1760 EXPORT_SYMBOL(obd_zombie_barrier);
1763 struct obd_export *obd_stale_export_get(void)
1765 struct obd_export *exp = NULL;
1768 spin_lock(&obd_stale_export_lock);
1769 if (!list_empty(&obd_stale_exports)) {
1770 exp = list_entry(obd_stale_exports.next,
1771 struct obd_export, exp_stale_list);
1772 list_del_init(&exp->exp_stale_list);
1774 spin_unlock(&obd_stale_export_lock);
1777 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1778 atomic_read(&obd_stale_export_num));
1782 EXPORT_SYMBOL(obd_stale_export_get);
1784 void obd_stale_export_put(struct obd_export *exp)
1788 LASSERT(list_empty(&exp->exp_stale_list));
1789 if (exp->exp_lock_hash &&
1790 atomic_read(&exp->exp_lock_hash->hs_count)) {
1791 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1792 atomic_read(&obd_stale_export_num));
1794 spin_lock_bh(&exp->exp_bl_list_lock);
1795 spin_lock(&obd_stale_export_lock);
1796 /* Add to the tail if there is no blocked locks,
1797 * to the head otherwise. */
1798 if (list_empty(&exp->exp_bl_list))
1799 list_add_tail(&exp->exp_stale_list,
1800 &obd_stale_exports);
1802 list_add(&exp->exp_stale_list,
1803 &obd_stale_exports);
1805 spin_unlock(&obd_stale_export_lock);
1806 spin_unlock_bh(&exp->exp_bl_list_lock);
1808 class_export_put(exp);
1812 EXPORT_SYMBOL(obd_stale_export_put);
1815 * Adjust the position of the export in the stale list,
1816 * i.e. move to the head of the list if is needed.
1818 void obd_stale_export_adjust(struct obd_export *exp)
1820 LASSERT(exp != NULL);
1821 spin_lock_bh(&exp->exp_bl_list_lock);
1822 spin_lock(&obd_stale_export_lock);
1824 if (!list_empty(&exp->exp_stale_list) &&
1825 !list_empty(&exp->exp_bl_list))
1826 list_move(&exp->exp_stale_list, &obd_stale_exports);
1828 spin_unlock(&obd_stale_export_lock);
1829 spin_unlock_bh(&exp->exp_bl_list_lock);
1831 EXPORT_SYMBOL(obd_stale_export_adjust);
1834 * destroy zombie export/import thread.
1836 static int obd_zombie_impexp_thread(void *unused)
1838 unshare_fs_struct();
1839 complete(&obd_zombie_start);
1841 obd_zombie_pid = current_pid();
1843 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1844 struct l_wait_info lwi = { 0 };
1846 l_wait_event(obd_zombie_waitq,
1847 !obd_zombie_impexp_check(NULL), &lwi);
1848 obd_zombie_impexp_cull();
1851 * Notify obd_zombie_barrier callers that queues
1854 wake_up(&obd_zombie_waitq);
1857 complete(&obd_zombie_stop);
1864 * start destroy zombie import/export thread
1866 int obd_zombie_impexp_init(void)
1868 struct task_struct *task;
1870 INIT_LIST_HEAD(&obd_zombie_imports);
1872 INIT_LIST_HEAD(&obd_zombie_exports);
1873 spin_lock_init(&obd_zombie_impexp_lock);
1874 init_completion(&obd_zombie_start);
1875 init_completion(&obd_zombie_stop);
1876 init_waitqueue_head(&obd_zombie_waitq);
1879 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1881 RETURN(PTR_ERR(task));
1883 wait_for_completion(&obd_zombie_start);
1887 * stop destroy zombie import/export thread
1889 void obd_zombie_impexp_stop(void)
1891 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1892 obd_zombie_impexp_notify();
1893 wait_for_completion(&obd_zombie_stop);
1896 /***** Kernel-userspace comm helpers *******/
1898 /* Get length of entire message, including header */
1899 int kuc_len(int payload_len)
1901 return sizeof(struct kuc_hdr) + payload_len;
1903 EXPORT_SYMBOL(kuc_len);
1905 /* Get a pointer to kuc header, given a ptr to the payload
1906 * @param p Pointer to payload area
1907 * @returns Pointer to kuc header
1909 struct kuc_hdr * kuc_ptr(void *p)
1911 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1912 LASSERT(lh->kuc_magic == KUC_MAGIC);
1915 EXPORT_SYMBOL(kuc_ptr);
1917 /* Test if payload is part of kuc message
1918 * @param p Pointer to payload area
1921 int kuc_ispayload(void *p)
1923 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1925 if (kh->kuc_magic == KUC_MAGIC)
1930 EXPORT_SYMBOL(kuc_ispayload);
1932 /* Alloc space for a message, and fill in header
1933 * @return Pointer to payload area
1935 void *kuc_alloc(int payload_len, int transport, int type)
1938 int len = kuc_len(payload_len);
1942 return ERR_PTR(-ENOMEM);
1944 lh->kuc_magic = KUC_MAGIC;
1945 lh->kuc_transport = transport;
1946 lh->kuc_msgtype = type;
1947 lh->kuc_msglen = len;
1949 return (void *)(lh + 1);
1951 EXPORT_SYMBOL(kuc_alloc);
1953 /* Takes pointer to payload area */
1954 inline void kuc_free(void *p, int payload_len)
1956 struct kuc_hdr *lh = kuc_ptr(p);
1957 OBD_FREE(lh, kuc_len(payload_len));
1959 EXPORT_SYMBOL(kuc_free);
1961 struct obd_request_slot_waiter {
1962 struct list_head orsw_entry;
1963 wait_queue_head_t orsw_waitq;
1967 static bool obd_request_slot_avail(struct client_obd *cli,
1968 struct obd_request_slot_waiter *orsw)
1972 spin_lock(&cli->cl_loi_list_lock);
1973 avail = !!list_empty(&orsw->orsw_entry);
1974 spin_unlock(&cli->cl_loi_list_lock);
1980 * For network flow control, the RPC sponsor needs to acquire a credit
1981 * before sending the RPC. The credits count for a connection is defined
1982 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1983 * the subsequent RPC sponsors need to wait until others released their
1984 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1986 int obd_get_request_slot(struct client_obd *cli)
1988 struct obd_request_slot_waiter orsw;
1989 struct l_wait_info lwi;
1992 spin_lock(&cli->cl_loi_list_lock);
1993 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1994 cli->cl_r_in_flight++;
1995 spin_unlock(&cli->cl_loi_list_lock);
1999 init_waitqueue_head(&orsw.orsw_waitq);
2000 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
2001 orsw.orsw_signaled = false;
2002 spin_unlock(&cli->cl_loi_list_lock);
2004 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
2005 rc = l_wait_event(orsw.orsw_waitq,
2006 obd_request_slot_avail(cli, &orsw) ||
2010 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2011 * freed but other (such as obd_put_request_slot) is using it. */
2012 spin_lock(&cli->cl_loi_list_lock);
2014 if (!orsw.orsw_signaled) {
2015 if (list_empty(&orsw.orsw_entry))
2016 cli->cl_r_in_flight--;
2018 list_del(&orsw.orsw_entry);
2022 if (orsw.orsw_signaled) {
2023 LASSERT(list_empty(&orsw.orsw_entry));
2027 spin_unlock(&cli->cl_loi_list_lock);
2031 EXPORT_SYMBOL(obd_get_request_slot);
2033 void obd_put_request_slot(struct client_obd *cli)
2035 struct obd_request_slot_waiter *orsw;
2037 spin_lock(&cli->cl_loi_list_lock);
2038 cli->cl_r_in_flight--;
2040 /* If there is free slot, wakeup the first waiter. */
2041 if (!list_empty(&cli->cl_loi_read_list) &&
2042 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2043 orsw = list_entry(cli->cl_loi_read_list.next,
2044 struct obd_request_slot_waiter, orsw_entry);
2045 list_del_init(&orsw->orsw_entry);
2046 cli->cl_r_in_flight++;
2047 wake_up(&orsw->orsw_waitq);
2049 spin_unlock(&cli->cl_loi_list_lock);
2051 EXPORT_SYMBOL(obd_put_request_slot);
2053 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2055 return cli->cl_max_rpcs_in_flight;
2057 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2059 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2061 struct obd_request_slot_waiter *orsw;
2068 if (max > OBD_MAX_RIF_MAX || max < 1)
2071 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2072 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2073 /* adjust max_mod_rpcs_in_flight to ensure it is always
2074 * strictly lower that max_rpcs_in_flight */
2076 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2077 "because it must be higher than "
2078 "max_mod_rpcs_in_flight value",
2079 cli->cl_import->imp_obd->obd_name);
2082 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2083 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2089 spin_lock(&cli->cl_loi_list_lock);
2090 old = cli->cl_max_rpcs_in_flight;
2091 cli->cl_max_rpcs_in_flight = max;
2094 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2095 for (i = 0; i < diff; i++) {
2096 if (list_empty(&cli->cl_loi_read_list))
2099 orsw = list_entry(cli->cl_loi_read_list.next,
2100 struct obd_request_slot_waiter, orsw_entry);
2101 list_del_init(&orsw->orsw_entry);
2102 cli->cl_r_in_flight++;
2103 wake_up(&orsw->orsw_waitq);
2105 spin_unlock(&cli->cl_loi_list_lock);
2109 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2111 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2113 return cli->cl_max_mod_rpcs_in_flight;
2115 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2117 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2119 struct obd_connect_data *ocd;
2123 if (max > OBD_MAX_RIF_MAX || max < 1)
2126 /* cannot exceed or equal max_rpcs_in_flight */
2127 if (max >= cli->cl_max_rpcs_in_flight) {
2128 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2129 "higher or equal to max_rpcs_in_flight value (%u)\n",
2130 cli->cl_import->imp_obd->obd_name,
2131 max, cli->cl_max_rpcs_in_flight);
2135 /* cannot exceed max modify RPCs in flight supported by the server */
2136 ocd = &cli->cl_import->imp_connect_data;
2137 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2138 maxmodrpcs = ocd->ocd_maxmodrpcs;
2141 if (max > maxmodrpcs) {
2142 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2143 "higher than max_mod_rpcs_per_client value (%hu) "
2144 "returned by the server at connection\n",
2145 cli->cl_import->imp_obd->obd_name,
2150 spin_lock(&cli->cl_mod_rpcs_lock);
2152 prev = cli->cl_max_mod_rpcs_in_flight;
2153 cli->cl_max_mod_rpcs_in_flight = max;
2155 /* wakeup waiters if limit has been increased */
2156 if (cli->cl_max_mod_rpcs_in_flight > prev)
2157 wake_up(&cli->cl_mod_rpcs_waitq);
2159 spin_unlock(&cli->cl_mod_rpcs_lock);
2163 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2166 #define pct(a, b) (b ? a * 100 / b : 0)
2167 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2168 struct seq_file *seq)
2171 unsigned long mod_tot = 0, mod_cum;
2174 do_gettimeofday(&now);
2176 spin_lock(&cli->cl_mod_rpcs_lock);
2178 seq_printf(seq, "snapshot_time: %lu.%lu (secs.usecs)\n",
2179 now.tv_sec, now.tv_usec);
2180 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2181 cli->cl_mod_rpcs_in_flight);
2183 seq_printf(seq, "\n\t\t\tmodify\n");
2184 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2186 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2189 for (i = 0; i < OBD_HIST_MAX; i++) {
2190 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2192 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2193 i, mod, pct(mod, mod_tot),
2194 pct(mod_cum, mod_tot));
2195 if (mod_cum == mod_tot)
2199 spin_unlock(&cli->cl_mod_rpcs_lock);
2203 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2207 /* The number of modify RPCs sent in parallel is limited
2208 * because the server has a finite number of slots per client to
2209 * store request result and ensure reply reconstruction when needed.
2210 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2211 * that takes into account server limit and cl_max_rpcs_in_flight
2213 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2214 * one close request is allowed above the maximum.
2216 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2221 /* A slot is available if
2222 * - number of modify RPCs in flight is less than the max
2223 * - it's a close RPC and no other close request is in flight
2225 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2226 (close_req && cli->cl_close_rpcs_in_flight == 0);
2231 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2236 spin_lock(&cli->cl_mod_rpcs_lock);
2237 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2238 spin_unlock(&cli->cl_mod_rpcs_lock);
2242 /* Get a modify RPC slot from the obd client @cli according
2243 * to the kind of operation @opc that is going to be sent
2244 * and the intent @it of the operation if it applies.
2245 * If the maximum number of modify RPCs in flight is reached
2246 * the thread is put to sleep.
2247 * Returns the tag to be set in the request message. Tag 0
2248 * is reserved for non-modifying requests.
2250 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2251 struct lookup_intent *it)
2253 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2254 bool close_req = false;
2257 /* read-only metadata RPCs don't consume a slot on MDT
2258 * for reply reconstruction
2260 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2261 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2264 if (opc == MDS_CLOSE)
2268 spin_lock(&cli->cl_mod_rpcs_lock);
2269 max = cli->cl_max_mod_rpcs_in_flight;
2270 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2271 /* there is a slot available */
2272 cli->cl_mod_rpcs_in_flight++;
2274 cli->cl_close_rpcs_in_flight++;
2275 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2276 cli->cl_mod_rpcs_in_flight);
2277 /* find a free tag */
2278 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2280 LASSERT(i < OBD_MAX_RIF_MAX);
2281 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2282 spin_unlock(&cli->cl_mod_rpcs_lock);
2283 /* tag 0 is reserved for non-modify RPCs */
2286 spin_unlock(&cli->cl_mod_rpcs_lock);
2288 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2289 "opc %u, max %hu\n",
2290 cli->cl_import->imp_obd->obd_name, opc, max);
2292 l_wait_event(cli->cl_mod_rpcs_waitq,
2293 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2296 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2298 /* Put a modify RPC slot from the obd client @cli according
2299 * to the kind of operation @opc that has been sent and the
2300 * intent @it of the operation if it applies.
2302 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2303 struct lookup_intent *it, __u16 tag)
2305 bool close_req = false;
2307 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2308 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2311 if (opc == MDS_CLOSE)
2314 spin_lock(&cli->cl_mod_rpcs_lock);
2315 cli->cl_mod_rpcs_in_flight--;
2317 cli->cl_close_rpcs_in_flight--;
2318 /* release the tag in the bitmap */
2319 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2320 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2321 spin_unlock(&cli->cl_mod_rpcs_lock);
2322 wake_up(&cli->cl_mod_rpcs_waitq);
2324 EXPORT_SYMBOL(obd_put_mod_rpc_slot);