4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_disk.h>
48 #include <lustre_kernelcomm.h>
50 spinlock_t obd_types_lock;
52 static struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 static struct kmem_cache *import_cachep;
57 static struct list_head obd_zombie_imports;
58 static struct list_head obd_zombie_exports;
59 static spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 struct list_head obd_stale_exports;
68 spinlock_t obd_stale_export_lock;
69 atomic_t obd_stale_export_num;
71 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
72 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
75 * support functions: we could use inter-module communication, but this
76 * is more portable to other OS's
78 static struct obd_device *obd_device_alloc(void)
80 struct obd_device *obd;
82 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
84 obd->obd_magic = OBD_DEVICE_MAGIC;
89 static void obd_device_free(struct obd_device *obd)
92 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
93 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
94 if (obd->obd_namespace != NULL) {
95 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
96 obd, obd->obd_namespace, obd->obd_force);
99 lu_ref_fini(&obd->obd_reference);
100 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
103 struct obd_type *class_search_type(const char *name)
105 struct list_head *tmp;
106 struct obd_type *type;
108 spin_lock(&obd_types_lock);
109 list_for_each(tmp, &obd_types) {
110 type = list_entry(tmp, struct obd_type, typ_chain);
111 if (strcmp(type->typ_name, name) == 0) {
112 spin_unlock(&obd_types_lock);
116 spin_unlock(&obd_types_lock);
119 EXPORT_SYMBOL(class_search_type);
121 struct obd_type *class_get_type(const char *name)
123 struct obd_type *type = class_search_type(name);
125 #ifdef HAVE_MODULE_LOADING_SUPPORT
127 const char *modname = name;
129 if (strcmp(modname, "obdfilter") == 0)
132 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
133 modname = LUSTRE_OSP_NAME;
135 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
136 modname = LUSTRE_MDT_NAME;
138 if (!request_module("%s", modname)) {
139 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
140 type = class_search_type(name);
142 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
148 spin_lock(&type->obd_type_lock);
150 try_module_get(type->typ_dt_ops->o_owner);
151 spin_unlock(&type->obd_type_lock);
156 void class_put_type(struct obd_type *type)
159 spin_lock(&type->obd_type_lock);
161 module_put(type->typ_dt_ops->o_owner);
162 spin_unlock(&type->obd_type_lock);
165 #define CLASS_MAX_NAME 1024
167 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
168 bool enable_proc, struct lprocfs_vars *vars,
169 const char *name, struct lu_device_type *ldt)
171 struct obd_type *type;
176 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
178 if (class_search_type(name)) {
179 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
184 OBD_ALLOC(type, sizeof(*type));
188 OBD_ALLOC_PTR(type->typ_dt_ops);
189 OBD_ALLOC_PTR(type->typ_md_ops);
190 OBD_ALLOC(type->typ_name, strlen(name) + 1);
192 if (type->typ_dt_ops == NULL ||
193 type->typ_md_ops == NULL ||
194 type->typ_name == NULL)
197 *(type->typ_dt_ops) = *dt_ops;
198 /* md_ops is optional */
200 *(type->typ_md_ops) = *md_ops;
201 strcpy(type->typ_name, name);
202 spin_lock_init(&type->obd_type_lock);
204 #ifdef CONFIG_PROC_FS
206 type->typ_procroot = lprocfs_register(type->typ_name,
209 if (IS_ERR(type->typ_procroot)) {
210 rc = PTR_ERR(type->typ_procroot);
211 type->typ_procroot = NULL;
218 rc = lu_device_type_init(ldt);
223 spin_lock(&obd_types_lock);
224 list_add(&type->typ_chain, &obd_types);
225 spin_unlock(&obd_types_lock);
230 if (type->typ_name != NULL) {
231 #ifdef CONFIG_PROC_FS
232 if (type->typ_procroot != NULL)
233 remove_proc_subtree(type->typ_name, proc_lustre_root);
235 OBD_FREE(type->typ_name, strlen(name) + 1);
237 if (type->typ_md_ops != NULL)
238 OBD_FREE_PTR(type->typ_md_ops);
239 if (type->typ_dt_ops != NULL)
240 OBD_FREE_PTR(type->typ_dt_ops);
241 OBD_FREE(type, sizeof(*type));
244 EXPORT_SYMBOL(class_register_type);
246 int class_unregister_type(const char *name)
248 struct obd_type *type = class_search_type(name);
252 CERROR("unknown obd type\n");
256 if (type->typ_refcnt) {
257 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
258 /* This is a bad situation, let's make the best of it */
259 /* Remove ops, but leave the name for debugging */
260 OBD_FREE_PTR(type->typ_dt_ops);
261 OBD_FREE_PTR(type->typ_md_ops);
265 /* we do not use type->typ_procroot as for compatibility purposes
266 * other modules can share names (i.e. lod can use lov entry). so
267 * we can't reference pointer as it can get invalided when another
268 * module removes the entry */
269 #ifdef CONFIG_PROC_FS
270 if (type->typ_procroot != NULL)
271 remove_proc_subtree(type->typ_name, proc_lustre_root);
272 if (type->typ_procsym != NULL)
273 lprocfs_remove(&type->typ_procsym);
276 lu_device_type_fini(type->typ_lu);
278 spin_lock(&obd_types_lock);
279 list_del(&type->typ_chain);
280 spin_unlock(&obd_types_lock);
281 OBD_FREE(type->typ_name, strlen(name) + 1);
282 if (type->typ_dt_ops != NULL)
283 OBD_FREE_PTR(type->typ_dt_ops);
284 if (type->typ_md_ops != NULL)
285 OBD_FREE_PTR(type->typ_md_ops);
286 OBD_FREE(type, sizeof(*type));
288 } /* class_unregister_type */
289 EXPORT_SYMBOL(class_unregister_type);
292 * Create a new obd device.
294 * Find an empty slot in ::obd_devs[], create a new obd device in it.
296 * \param[in] type_name obd device type string.
297 * \param[in] name obd device name.
299 * \retval NULL if create fails, otherwise return the obd device
302 struct obd_device *class_newdev(const char *type_name, const char *name)
304 struct obd_device *result = NULL;
305 struct obd_device *newdev;
306 struct obd_type *type = NULL;
308 int new_obd_minor = 0;
311 if (strlen(name) >= MAX_OBD_NAME) {
312 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
313 RETURN(ERR_PTR(-EINVAL));
316 type = class_get_type(type_name);
318 CERROR("OBD: unknown type: %s\n", type_name);
319 RETURN(ERR_PTR(-ENODEV));
322 newdev = obd_device_alloc();
324 GOTO(out_type, result = ERR_PTR(-ENOMEM));
326 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
328 write_lock(&obd_dev_lock);
329 for (i = 0; i < class_devno_max(); i++) {
330 struct obd_device *obd = class_num2obd(i);
332 if (obd && (strcmp(name, obd->obd_name) == 0)) {
333 CERROR("Device %s already exists at %d, won't add\n",
336 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
337 "%p obd_magic %08x != %08x\n", result,
338 result->obd_magic, OBD_DEVICE_MAGIC);
339 LASSERTF(result->obd_minor == new_obd_minor,
340 "%p obd_minor %d != %d\n", result,
341 result->obd_minor, new_obd_minor);
343 obd_devs[result->obd_minor] = NULL;
344 result->obd_name[0]='\0';
346 result = ERR_PTR(-EEXIST);
349 if (!result && !obd) {
351 result->obd_minor = i;
353 result->obd_type = type;
354 strncpy(result->obd_name, name,
355 sizeof(result->obd_name) - 1);
356 obd_devs[i] = result;
359 write_unlock(&obd_dev_lock);
361 if (result == NULL && i >= class_devno_max()) {
362 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
364 GOTO(out, result = ERR_PTR(-EOVERFLOW));
370 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
371 result->obd_name, result);
375 obd_device_free(newdev);
377 class_put_type(type);
381 void class_release_dev(struct obd_device *obd)
383 struct obd_type *obd_type = obd->obd_type;
385 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
386 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
387 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
388 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
389 LASSERT(obd_type != NULL);
391 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
392 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
394 write_lock(&obd_dev_lock);
395 obd_devs[obd->obd_minor] = NULL;
396 write_unlock(&obd_dev_lock);
397 obd_device_free(obd);
399 class_put_type(obd_type);
402 int class_name2dev(const char *name)
409 read_lock(&obd_dev_lock);
410 for (i = 0; i < class_devno_max(); i++) {
411 struct obd_device *obd = class_num2obd(i);
413 if (obd && strcmp(name, obd->obd_name) == 0) {
414 /* Make sure we finished attaching before we give
415 out any references */
416 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
417 if (obd->obd_attached) {
418 read_unlock(&obd_dev_lock);
424 read_unlock(&obd_dev_lock);
429 struct obd_device *class_name2obd(const char *name)
431 int dev = class_name2dev(name);
433 if (dev < 0 || dev > class_devno_max())
435 return class_num2obd(dev);
437 EXPORT_SYMBOL(class_name2obd);
439 int class_uuid2dev(struct obd_uuid *uuid)
443 read_lock(&obd_dev_lock);
444 for (i = 0; i < class_devno_max(); i++) {
445 struct obd_device *obd = class_num2obd(i);
447 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
448 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
449 read_unlock(&obd_dev_lock);
453 read_unlock(&obd_dev_lock);
458 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
460 int dev = class_uuid2dev(uuid);
463 return class_num2obd(dev);
465 EXPORT_SYMBOL(class_uuid2obd);
468 * Get obd device from ::obd_devs[]
470 * \param num [in] array index
472 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
473 * otherwise return the obd device there.
475 struct obd_device *class_num2obd(int num)
477 struct obd_device *obd = NULL;
479 if (num < class_devno_max()) {
484 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
485 "%p obd_magic %08x != %08x\n",
486 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
487 LASSERTF(obd->obd_minor == num,
488 "%p obd_minor %0d != %0d\n",
489 obd, obd->obd_minor, num);
496 * Get obd devices count. Device in any
498 * \retval obd device count
500 int get_devices_count(void)
502 int index, max_index = class_devno_max(), dev_count = 0;
504 read_lock(&obd_dev_lock);
505 for (index = 0; index <= max_index; index++) {
506 struct obd_device *obd = class_num2obd(index);
510 read_unlock(&obd_dev_lock);
514 EXPORT_SYMBOL(get_devices_count);
516 void class_obd_list(void)
521 read_lock(&obd_dev_lock);
522 for (i = 0; i < class_devno_max(); i++) {
523 struct obd_device *obd = class_num2obd(i);
527 if (obd->obd_stopping)
529 else if (obd->obd_set_up)
531 else if (obd->obd_attached)
535 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
536 i, status, obd->obd_type->typ_name,
537 obd->obd_name, obd->obd_uuid.uuid,
538 atomic_read(&obd->obd_refcount));
540 read_unlock(&obd_dev_lock);
544 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
545 specified, then only the client with that uuid is returned,
546 otherwise any client connected to the tgt is returned. */
547 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
548 const char * typ_name,
549 struct obd_uuid *grp_uuid)
553 read_lock(&obd_dev_lock);
554 for (i = 0; i < class_devno_max(); i++) {
555 struct obd_device *obd = class_num2obd(i);
559 if ((strncmp(obd->obd_type->typ_name, typ_name,
560 strlen(typ_name)) == 0)) {
561 if (obd_uuid_equals(tgt_uuid,
562 &obd->u.cli.cl_target_uuid) &&
563 ((grp_uuid)? obd_uuid_equals(grp_uuid,
564 &obd->obd_uuid) : 1)) {
565 read_unlock(&obd_dev_lock);
570 read_unlock(&obd_dev_lock);
574 EXPORT_SYMBOL(class_find_client_obd);
576 /* Iterate the obd_device list looking devices have grp_uuid. Start
577 searching at *next, and if a device is found, the next index to look
578 at is saved in *next. If next is NULL, then the first matching device
579 will always be returned. */
580 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
586 else if (*next >= 0 && *next < class_devno_max())
591 read_lock(&obd_dev_lock);
592 for (; i < class_devno_max(); i++) {
593 struct obd_device *obd = class_num2obd(i);
597 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
600 read_unlock(&obd_dev_lock);
604 read_unlock(&obd_dev_lock);
608 EXPORT_SYMBOL(class_devices_in_group);
611 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
612 * adjust sptlrpc settings accordingly.
614 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
616 struct obd_device *obd;
620 LASSERT(namelen > 0);
622 read_lock(&obd_dev_lock);
623 for (i = 0; i < class_devno_max(); i++) {
624 obd = class_num2obd(i);
626 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
629 /* only notify mdc, osc, mdt, ost */
630 type = obd->obd_type->typ_name;
631 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
632 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
633 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
634 strcmp(type, LUSTRE_OST_NAME) != 0)
637 if (strncmp(obd->obd_name, fsname, namelen))
640 class_incref(obd, __FUNCTION__, obd);
641 read_unlock(&obd_dev_lock);
642 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
643 sizeof(KEY_SPTLRPC_CONF),
644 KEY_SPTLRPC_CONF, 0, NULL, NULL);
646 class_decref(obd, __FUNCTION__, obd);
647 read_lock(&obd_dev_lock);
649 read_unlock(&obd_dev_lock);
652 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
654 void obd_cleanup_caches(void)
657 if (obd_device_cachep) {
658 kmem_cache_destroy(obd_device_cachep);
659 obd_device_cachep = NULL;
662 kmem_cache_destroy(obdo_cachep);
666 kmem_cache_destroy(import_cachep);
667 import_cachep = NULL;
673 int obd_init_caches(void)
678 LASSERT(obd_device_cachep == NULL);
679 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
680 sizeof(struct obd_device),
682 if (!obd_device_cachep)
683 GOTO(out, rc = -ENOMEM);
685 LASSERT(obdo_cachep == NULL);
686 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
689 GOTO(out, rc = -ENOMEM);
691 LASSERT(import_cachep == NULL);
692 import_cachep = kmem_cache_create("ll_import_cache",
693 sizeof(struct obd_import),
696 GOTO(out, rc = -ENOMEM);
700 obd_cleanup_caches();
704 /* map connection to client */
705 struct obd_export *class_conn2export(struct lustre_handle *conn)
707 struct obd_export *export;
711 CDEBUG(D_CACHE, "looking for null handle\n");
715 if (conn->cookie == -1) { /* this means assign a new connection */
716 CDEBUG(D_CACHE, "want a new connection\n");
720 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
721 export = class_handle2object(conn->cookie, NULL);
724 EXPORT_SYMBOL(class_conn2export);
726 struct obd_device *class_exp2obd(struct obd_export *exp)
732 EXPORT_SYMBOL(class_exp2obd);
734 struct obd_device *class_conn2obd(struct lustre_handle *conn)
736 struct obd_export *export;
737 export = class_conn2export(conn);
739 struct obd_device *obd = export->exp_obd;
740 class_export_put(export);
746 struct obd_import *class_exp2cliimp(struct obd_export *exp)
748 struct obd_device *obd = exp->exp_obd;
751 return obd->u.cli.cl_import;
753 EXPORT_SYMBOL(class_exp2cliimp);
755 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
757 struct obd_device *obd = class_conn2obd(conn);
760 return obd->u.cli.cl_import;
763 /* Export management functions */
764 static void class_export_destroy(struct obd_export *exp)
766 struct obd_device *obd = exp->exp_obd;
769 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
770 LASSERT(obd != NULL);
772 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
773 exp->exp_client_uuid.uuid, obd->obd_name);
775 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
776 if (exp->exp_connection)
777 ptlrpc_put_connection_superhack(exp->exp_connection);
779 LASSERT(list_empty(&exp->exp_outstanding_replies));
780 LASSERT(list_empty(&exp->exp_uncommitted_replies));
781 LASSERT(list_empty(&exp->exp_req_replay_queue));
782 LASSERT(list_empty(&exp->exp_hp_rpcs));
783 obd_destroy_export(exp);
784 class_decref(obd, "export", exp);
786 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
790 static void export_handle_addref(void *export)
792 class_export_get(export);
795 static struct portals_handle_ops export_handle_ops = {
796 .hop_addref = export_handle_addref,
800 struct obd_export *class_export_get(struct obd_export *exp)
802 atomic_inc(&exp->exp_refcount);
803 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
804 atomic_read(&exp->exp_refcount));
807 EXPORT_SYMBOL(class_export_get);
809 void class_export_put(struct obd_export *exp)
811 LASSERT(exp != NULL);
812 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
813 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
814 atomic_read(&exp->exp_refcount) - 1);
816 if (atomic_dec_and_test(&exp->exp_refcount)) {
817 LASSERT(!list_empty(&exp->exp_obd_chain));
818 LASSERT(list_empty(&exp->exp_stale_list));
819 CDEBUG(D_IOCTL, "final put %p/%s\n",
820 exp, exp->exp_client_uuid.uuid);
822 /* release nid stat refererence */
823 lprocfs_exp_cleanup(exp);
825 obd_zombie_export_add(exp);
828 EXPORT_SYMBOL(class_export_put);
830 /* Creates a new export, adds it to the hash table, and returns a
831 * pointer to it. The refcount is 2: one for the hash reference, and
832 * one for the pointer returned by this function. */
833 struct obd_export *class_new_export(struct obd_device *obd,
834 struct obd_uuid *cluuid)
836 struct obd_export *export;
837 struct cfs_hash *hash = NULL;
841 OBD_ALLOC_PTR(export);
843 return ERR_PTR(-ENOMEM);
845 export->exp_conn_cnt = 0;
846 export->exp_lock_hash = NULL;
847 export->exp_flock_hash = NULL;
848 atomic_set(&export->exp_refcount, 2);
849 atomic_set(&export->exp_rpc_count, 0);
850 atomic_set(&export->exp_cb_count, 0);
851 atomic_set(&export->exp_locks_count, 0);
852 #if LUSTRE_TRACKS_LOCK_EXP_REFS
853 INIT_LIST_HEAD(&export->exp_locks_list);
854 spin_lock_init(&export->exp_locks_list_guard);
856 atomic_set(&export->exp_replay_count, 0);
857 export->exp_obd = obd;
858 INIT_LIST_HEAD(&export->exp_outstanding_replies);
859 spin_lock_init(&export->exp_uncommitted_replies_lock);
860 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
861 INIT_LIST_HEAD(&export->exp_req_replay_queue);
862 INIT_LIST_HEAD(&export->exp_handle.h_link);
863 INIT_LIST_HEAD(&export->exp_hp_rpcs);
864 INIT_LIST_HEAD(&export->exp_reg_rpcs);
865 class_handle_hash(&export->exp_handle, &export_handle_ops);
866 export->exp_last_request_time = cfs_time_current_sec();
867 spin_lock_init(&export->exp_lock);
868 spin_lock_init(&export->exp_rpc_lock);
869 INIT_HLIST_NODE(&export->exp_uuid_hash);
870 INIT_HLIST_NODE(&export->exp_nid_hash);
871 INIT_HLIST_NODE(&export->exp_gen_hash);
872 spin_lock_init(&export->exp_bl_list_lock);
873 INIT_LIST_HEAD(&export->exp_bl_list);
874 INIT_LIST_HEAD(&export->exp_stale_list);
876 export->exp_sp_peer = LUSTRE_SP_ANY;
877 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
878 export->exp_client_uuid = *cluuid;
879 obd_init_export(export);
881 spin_lock(&obd->obd_dev_lock);
882 /* shouldn't happen, but might race */
883 if (obd->obd_stopping)
884 GOTO(exit_unlock, rc = -ENODEV);
886 hash = cfs_hash_getref(obd->obd_uuid_hash);
888 GOTO(exit_unlock, rc = -ENODEV);
889 spin_unlock(&obd->obd_dev_lock);
891 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
892 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
894 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
895 obd->obd_name, cluuid->uuid, rc);
896 GOTO(exit_err, rc = -EALREADY);
900 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
901 spin_lock(&obd->obd_dev_lock);
902 if (obd->obd_stopping) {
903 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
904 GOTO(exit_unlock, rc = -ENODEV);
907 class_incref(obd, "export", export);
908 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
909 list_add_tail(&export->exp_obd_chain_timed,
910 &export->exp_obd->obd_exports_timed);
911 export->exp_obd->obd_num_exports++;
912 spin_unlock(&obd->obd_dev_lock);
913 cfs_hash_putref(hash);
917 spin_unlock(&obd->obd_dev_lock);
920 cfs_hash_putref(hash);
921 class_handle_unhash(&export->exp_handle);
922 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
923 obd_destroy_export(export);
924 OBD_FREE_PTR(export);
927 EXPORT_SYMBOL(class_new_export);
929 void class_unlink_export(struct obd_export *exp)
931 class_handle_unhash(&exp->exp_handle);
933 spin_lock(&exp->exp_obd->obd_dev_lock);
934 /* delete an uuid-export hashitem from hashtables */
935 if (!hlist_unhashed(&exp->exp_uuid_hash))
936 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
937 &exp->exp_client_uuid,
938 &exp->exp_uuid_hash);
940 if (!hlist_unhashed(&exp->exp_gen_hash)) {
941 struct tg_export_data *ted = &exp->exp_target_data;
942 struct cfs_hash *hash;
944 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
945 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
947 cfs_hash_putref(hash);
950 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
951 list_del_init(&exp->exp_obd_chain_timed);
952 exp->exp_obd->obd_num_exports--;
953 spin_unlock(&exp->exp_obd->obd_dev_lock);
954 atomic_inc(&obd_stale_export_num);
956 /* A reference is kept by obd_stale_exports list */
957 obd_stale_export_put(exp);
960 /* Import management functions */
961 static void class_import_destroy(struct obd_import *imp)
965 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
966 imp->imp_obd->obd_name);
968 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
970 ptlrpc_put_connection_superhack(imp->imp_connection);
972 while (!list_empty(&imp->imp_conn_list)) {
973 struct obd_import_conn *imp_conn;
975 imp_conn = list_entry(imp->imp_conn_list.next,
976 struct obd_import_conn, oic_item);
977 list_del_init(&imp_conn->oic_item);
978 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
979 OBD_FREE(imp_conn, sizeof(*imp_conn));
982 LASSERT(imp->imp_sec == NULL);
983 class_decref(imp->imp_obd, "import", imp);
984 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
988 static void import_handle_addref(void *import)
990 class_import_get(import);
993 static struct portals_handle_ops import_handle_ops = {
994 .hop_addref = import_handle_addref,
998 struct obd_import *class_import_get(struct obd_import *import)
1000 atomic_inc(&import->imp_refcount);
1001 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1002 atomic_read(&import->imp_refcount),
1003 import->imp_obd->obd_name);
1006 EXPORT_SYMBOL(class_import_get);
1008 void class_import_put(struct obd_import *imp)
1012 LASSERT(list_empty(&imp->imp_zombie_chain));
1013 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1015 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1016 atomic_read(&imp->imp_refcount) - 1,
1017 imp->imp_obd->obd_name);
1019 if (atomic_dec_and_test(&imp->imp_refcount)) {
1020 CDEBUG(D_INFO, "final put import %p\n", imp);
1021 obd_zombie_import_add(imp);
1024 /* catch possible import put race */
1025 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1028 EXPORT_SYMBOL(class_import_put);
1030 static void init_imp_at(struct imp_at *at) {
1032 at_init(&at->iat_net_latency, 0, 0);
1033 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1034 /* max service estimates are tracked on the server side, so
1035 don't use the AT history here, just use the last reported
1036 val. (But keep hist for proc histogram, worst_ever) */
1037 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1042 struct obd_import *class_new_import(struct obd_device *obd)
1044 struct obd_import *imp;
1046 OBD_ALLOC(imp, sizeof(*imp));
1050 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1051 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1052 INIT_LIST_HEAD(&imp->imp_replay_list);
1053 INIT_LIST_HEAD(&imp->imp_sending_list);
1054 INIT_LIST_HEAD(&imp->imp_delayed_list);
1055 INIT_LIST_HEAD(&imp->imp_committed_list);
1056 imp->imp_replay_cursor = &imp->imp_committed_list;
1057 spin_lock_init(&imp->imp_lock);
1058 imp->imp_last_success_conn = 0;
1059 imp->imp_state = LUSTRE_IMP_NEW;
1060 imp->imp_obd = class_incref(obd, "import", imp);
1061 mutex_init(&imp->imp_sec_mutex);
1062 init_waitqueue_head(&imp->imp_recovery_waitq);
1064 atomic_set(&imp->imp_refcount, 2);
1065 atomic_set(&imp->imp_unregistering, 0);
1066 atomic_set(&imp->imp_inflight, 0);
1067 atomic_set(&imp->imp_replay_inflight, 0);
1068 atomic_set(&imp->imp_inval_count, 0);
1069 INIT_LIST_HEAD(&imp->imp_conn_list);
1070 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1071 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1072 init_imp_at(&imp->imp_at);
1074 /* the default magic is V2, will be used in connect RPC, and
1075 * then adjusted according to the flags in request/reply. */
1076 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1080 EXPORT_SYMBOL(class_new_import);
1082 void class_destroy_import(struct obd_import *import)
1084 LASSERT(import != NULL);
1085 LASSERT(import != LP_POISON);
1087 class_handle_unhash(&import->imp_handle);
1089 spin_lock(&import->imp_lock);
1090 import->imp_generation++;
1091 spin_unlock(&import->imp_lock);
1092 class_import_put(import);
1094 EXPORT_SYMBOL(class_destroy_import);
1096 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1098 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1100 spin_lock(&exp->exp_locks_list_guard);
1102 LASSERT(lock->l_exp_refs_nr >= 0);
1104 if (lock->l_exp_refs_target != NULL &&
1105 lock->l_exp_refs_target != exp) {
1106 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1107 exp, lock, lock->l_exp_refs_target);
1109 if ((lock->l_exp_refs_nr ++) == 0) {
1110 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1111 lock->l_exp_refs_target = exp;
1113 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1114 lock, exp, lock->l_exp_refs_nr);
1115 spin_unlock(&exp->exp_locks_list_guard);
1118 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1120 spin_lock(&exp->exp_locks_list_guard);
1121 LASSERT(lock->l_exp_refs_nr > 0);
1122 if (lock->l_exp_refs_target != exp) {
1123 LCONSOLE_WARN("lock %p, "
1124 "mismatching export pointers: %p, %p\n",
1125 lock, lock->l_exp_refs_target, exp);
1127 if (-- lock->l_exp_refs_nr == 0) {
1128 list_del_init(&lock->l_exp_refs_link);
1129 lock->l_exp_refs_target = NULL;
1131 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1132 lock, exp, lock->l_exp_refs_nr);
1133 spin_unlock(&exp->exp_locks_list_guard);
1137 /* A connection defines an export context in which preallocation can
1138 be managed. This releases the export pointer reference, and returns
1139 the export handle, so the export refcount is 1 when this function
1141 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1142 struct obd_uuid *cluuid)
1144 struct obd_export *export;
1145 LASSERT(conn != NULL);
1146 LASSERT(obd != NULL);
1147 LASSERT(cluuid != NULL);
1150 export = class_new_export(obd, cluuid);
1152 RETURN(PTR_ERR(export));
1154 conn->cookie = export->exp_handle.h_cookie;
1155 class_export_put(export);
1157 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1158 cluuid->uuid, conn->cookie);
1161 EXPORT_SYMBOL(class_connect);
1163 /* if export is involved in recovery then clean up related things */
1164 static void class_export_recovery_cleanup(struct obd_export *exp)
1166 struct obd_device *obd = exp->exp_obd;
1168 spin_lock(&obd->obd_recovery_task_lock);
1169 if (obd->obd_recovering) {
1170 if (exp->exp_in_recovery) {
1171 spin_lock(&exp->exp_lock);
1172 exp->exp_in_recovery = 0;
1173 spin_unlock(&exp->exp_lock);
1174 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1175 atomic_dec(&obd->obd_connected_clients);
1178 /* if called during recovery then should update
1179 * obd_stale_clients counter,
1180 * lightweight exports are not counted */
1181 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1182 exp->exp_obd->obd_stale_clients++;
1184 spin_unlock(&obd->obd_recovery_task_lock);
1186 spin_lock(&exp->exp_lock);
1187 /** Cleanup req replay fields */
1188 if (exp->exp_req_replay_needed) {
1189 exp->exp_req_replay_needed = 0;
1191 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1192 atomic_dec(&obd->obd_req_replay_clients);
1195 /** Cleanup lock replay data */
1196 if (exp->exp_lock_replay_needed) {
1197 exp->exp_lock_replay_needed = 0;
1199 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1200 atomic_dec(&obd->obd_lock_replay_clients);
1202 spin_unlock(&exp->exp_lock);
1205 /* This function removes 1-3 references from the export:
1206 * 1 - for export pointer passed
1207 * and if disconnect really need
1208 * 2 - removing from hash
1209 * 3 - in client_unlink_export
1210 * The export pointer passed to this function can destroyed */
1211 int class_disconnect(struct obd_export *export)
1213 int already_disconnected;
1216 if (export == NULL) {
1217 CWARN("attempting to free NULL export %p\n", export);
1221 spin_lock(&export->exp_lock);
1222 already_disconnected = export->exp_disconnected;
1223 export->exp_disconnected = 1;
1224 spin_unlock(&export->exp_lock);
1226 /* class_cleanup(), abort_recovery(), and class_fail_export()
1227 * all end up in here, and if any of them race we shouldn't
1228 * call extra class_export_puts(). */
1229 if (already_disconnected) {
1230 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1231 GOTO(no_disconn, already_disconnected);
1234 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1235 export->exp_handle.h_cookie);
1237 if (!hlist_unhashed(&export->exp_nid_hash))
1238 cfs_hash_del(export->exp_obd->obd_nid_hash,
1239 &export->exp_connection->c_peer.nid,
1240 &export->exp_nid_hash);
1242 class_export_recovery_cleanup(export);
1243 class_unlink_export(export);
1245 class_export_put(export);
1248 EXPORT_SYMBOL(class_disconnect);
1250 /* Return non-zero for a fully connected export */
1251 int class_connected_export(struct obd_export *exp)
1256 spin_lock(&exp->exp_lock);
1257 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1258 spin_unlock(&exp->exp_lock);
1262 EXPORT_SYMBOL(class_connected_export);
1264 static void class_disconnect_export_list(struct list_head *list,
1265 enum obd_option flags)
1268 struct obd_export *exp;
1271 /* It's possible that an export may disconnect itself, but
1272 * nothing else will be added to this list. */
1273 while (!list_empty(list)) {
1274 exp = list_entry(list->next, struct obd_export,
1276 /* need for safe call CDEBUG after obd_disconnect */
1277 class_export_get(exp);
1279 spin_lock(&exp->exp_lock);
1280 exp->exp_flags = flags;
1281 spin_unlock(&exp->exp_lock);
1283 if (obd_uuid_equals(&exp->exp_client_uuid,
1284 &exp->exp_obd->obd_uuid)) {
1286 "exp %p export uuid == obd uuid, don't discon\n",
1288 /* Need to delete this now so we don't end up pointing
1289 * to work_list later when this export is cleaned up. */
1290 list_del_init(&exp->exp_obd_chain);
1291 class_export_put(exp);
1295 class_export_get(exp);
1296 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1297 "last request at "CFS_TIME_T"\n",
1298 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1299 exp, exp->exp_last_request_time);
1300 /* release one export reference anyway */
1301 rc = obd_disconnect(exp);
1303 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1304 obd_export_nid2str(exp), exp, rc);
1305 class_export_put(exp);
1310 void class_disconnect_exports(struct obd_device *obd)
1312 struct list_head work_list;
1315 /* Move all of the exports from obd_exports to a work list, en masse. */
1316 INIT_LIST_HEAD(&work_list);
1317 spin_lock(&obd->obd_dev_lock);
1318 list_splice_init(&obd->obd_exports, &work_list);
1319 list_splice_init(&obd->obd_delayed_exports, &work_list);
1320 spin_unlock(&obd->obd_dev_lock);
1322 if (!list_empty(&work_list)) {
1323 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1324 "disconnecting them\n", obd->obd_minor, obd);
1325 class_disconnect_export_list(&work_list,
1326 exp_flags_from_obd(obd));
1328 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1329 obd->obd_minor, obd);
1332 EXPORT_SYMBOL(class_disconnect_exports);
1334 /* Remove exports that have not completed recovery.
1336 void class_disconnect_stale_exports(struct obd_device *obd,
1337 int (*test_export)(struct obd_export *))
1339 struct list_head work_list;
1340 struct obd_export *exp, *n;
1344 INIT_LIST_HEAD(&work_list);
1345 spin_lock(&obd->obd_dev_lock);
1346 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1348 /* don't count self-export as client */
1349 if (obd_uuid_equals(&exp->exp_client_uuid,
1350 &exp->exp_obd->obd_uuid))
1353 /* don't evict clients which have no slot in last_rcvd
1354 * (e.g. lightweight connection) */
1355 if (exp->exp_target_data.ted_lr_idx == -1)
1358 spin_lock(&exp->exp_lock);
1359 if (exp->exp_failed || test_export(exp)) {
1360 spin_unlock(&exp->exp_lock);
1363 exp->exp_failed = 1;
1364 spin_unlock(&exp->exp_lock);
1366 list_move(&exp->exp_obd_chain, &work_list);
1368 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1369 obd->obd_name, exp->exp_client_uuid.uuid,
1370 exp->exp_connection == NULL ? "<unknown>" :
1371 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1372 print_export_data(exp, "EVICTING", 0);
1374 spin_unlock(&obd->obd_dev_lock);
1377 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1378 obd->obd_name, evicted);
1380 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1381 OBD_OPT_ABORT_RECOV);
1384 EXPORT_SYMBOL(class_disconnect_stale_exports);
1386 void class_fail_export(struct obd_export *exp)
1388 int rc, already_failed;
1390 spin_lock(&exp->exp_lock);
1391 already_failed = exp->exp_failed;
1392 exp->exp_failed = 1;
1393 spin_unlock(&exp->exp_lock);
1395 if (already_failed) {
1396 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1397 exp, exp->exp_client_uuid.uuid);
1401 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1402 exp, exp->exp_client_uuid.uuid);
1404 if (obd_dump_on_timeout)
1405 libcfs_debug_dumplog();
1407 /* need for safe call CDEBUG after obd_disconnect */
1408 class_export_get(exp);
1410 /* Most callers into obd_disconnect are removing their own reference
1411 * (request, for example) in addition to the one from the hash table.
1412 * We don't have such a reference here, so make one. */
1413 class_export_get(exp);
1414 rc = obd_disconnect(exp);
1416 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1418 CDEBUG(D_HA, "disconnected export %p/%s\n",
1419 exp, exp->exp_client_uuid.uuid);
1420 class_export_put(exp);
1422 EXPORT_SYMBOL(class_fail_export);
1424 char *obd_export_nid2str(struct obd_export *exp)
1426 if (exp->exp_connection != NULL)
1427 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1431 EXPORT_SYMBOL(obd_export_nid2str);
1433 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1435 struct cfs_hash *nid_hash;
1436 struct obd_export *doomed_exp = NULL;
1437 int exports_evicted = 0;
1439 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1441 spin_lock(&obd->obd_dev_lock);
1442 /* umount has run already, so evict thread should leave
1443 * its task to umount thread now */
1444 if (obd->obd_stopping) {
1445 spin_unlock(&obd->obd_dev_lock);
1446 return exports_evicted;
1448 nid_hash = obd->obd_nid_hash;
1449 cfs_hash_getref(nid_hash);
1450 spin_unlock(&obd->obd_dev_lock);
1453 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1454 if (doomed_exp == NULL)
1457 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1458 "nid %s found, wanted nid %s, requested nid %s\n",
1459 obd_export_nid2str(doomed_exp),
1460 libcfs_nid2str(nid_key), nid);
1461 LASSERTF(doomed_exp != obd->obd_self_export,
1462 "self-export is hashed by NID?\n");
1464 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1465 "request\n", obd->obd_name,
1466 obd_uuid2str(&doomed_exp->exp_client_uuid),
1467 obd_export_nid2str(doomed_exp));
1468 class_fail_export(doomed_exp);
1469 class_export_put(doomed_exp);
1472 cfs_hash_putref(nid_hash);
1474 if (!exports_evicted)
1475 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1476 obd->obd_name, nid);
1477 return exports_evicted;
1479 EXPORT_SYMBOL(obd_export_evict_by_nid);
1481 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1483 struct cfs_hash *uuid_hash;
1484 struct obd_export *doomed_exp = NULL;
1485 struct obd_uuid doomed_uuid;
1486 int exports_evicted = 0;
1488 spin_lock(&obd->obd_dev_lock);
1489 if (obd->obd_stopping) {
1490 spin_unlock(&obd->obd_dev_lock);
1491 return exports_evicted;
1493 uuid_hash = obd->obd_uuid_hash;
1494 cfs_hash_getref(uuid_hash);
1495 spin_unlock(&obd->obd_dev_lock);
1497 obd_str2uuid(&doomed_uuid, uuid);
1498 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1499 CERROR("%s: can't evict myself\n", obd->obd_name);
1500 cfs_hash_putref(uuid_hash);
1501 return exports_evicted;
1504 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1506 if (doomed_exp == NULL) {
1507 CERROR("%s: can't disconnect %s: no exports found\n",
1508 obd->obd_name, uuid);
1510 CWARN("%s: evicting %s at adminstrative request\n",
1511 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1512 class_fail_export(doomed_exp);
1513 class_export_put(doomed_exp);
1516 cfs_hash_putref(uuid_hash);
1518 return exports_evicted;
1521 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1522 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1525 static void print_export_data(struct obd_export *exp, const char *status,
1528 struct ptlrpc_reply_state *rs;
1529 struct ptlrpc_reply_state *first_reply = NULL;
1532 spin_lock(&exp->exp_lock);
1533 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1539 spin_unlock(&exp->exp_lock);
1541 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1542 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1543 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1544 atomic_read(&exp->exp_rpc_count),
1545 atomic_read(&exp->exp_cb_count),
1546 atomic_read(&exp->exp_locks_count),
1547 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1548 nreplies, first_reply, nreplies > 3 ? "..." : "",
1549 exp->exp_last_committed);
1550 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1551 if (locks && class_export_dump_hook != NULL)
1552 class_export_dump_hook(exp);
1556 void dump_exports(struct obd_device *obd, int locks)
1558 struct obd_export *exp;
1560 spin_lock(&obd->obd_dev_lock);
1561 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1562 print_export_data(exp, "ACTIVE", locks);
1563 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1564 print_export_data(exp, "UNLINKED", locks);
1565 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1566 print_export_data(exp, "DELAYED", locks);
1567 spin_unlock(&obd->obd_dev_lock);
1568 spin_lock(&obd_zombie_impexp_lock);
1569 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1570 print_export_data(exp, "ZOMBIE", locks);
1571 spin_unlock(&obd_zombie_impexp_lock);
1574 void obd_exports_barrier(struct obd_device *obd)
1577 LASSERT(list_empty(&obd->obd_exports));
1578 spin_lock(&obd->obd_dev_lock);
1579 while (!list_empty(&obd->obd_unlinked_exports)) {
1580 spin_unlock(&obd->obd_dev_lock);
1581 set_current_state(TASK_UNINTERRUPTIBLE);
1582 schedule_timeout(cfs_time_seconds(waited));
1583 if (waited > 5 && IS_PO2(waited)) {
1584 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1585 "more than %d seconds. "
1586 "The obd refcount = %d. Is it stuck?\n",
1587 obd->obd_name, waited,
1588 atomic_read(&obd->obd_refcount));
1589 dump_exports(obd, 1);
1592 spin_lock(&obd->obd_dev_lock);
1594 spin_unlock(&obd->obd_dev_lock);
1596 EXPORT_SYMBOL(obd_exports_barrier);
1598 /* Total amount of zombies to be destroyed */
1599 static int zombies_count = 0;
1602 * kill zombie imports and exports
1604 void obd_zombie_impexp_cull(void)
1606 struct obd_import *import;
1607 struct obd_export *export;
1611 spin_lock(&obd_zombie_impexp_lock);
1614 if (!list_empty(&obd_zombie_imports)) {
1615 import = list_entry(obd_zombie_imports.next,
1618 list_del_init(&import->imp_zombie_chain);
1622 if (!list_empty(&obd_zombie_exports)) {
1623 export = list_entry(obd_zombie_exports.next,
1626 list_del_init(&export->exp_obd_chain);
1629 spin_unlock(&obd_zombie_impexp_lock);
1631 if (import != NULL) {
1632 class_import_destroy(import);
1633 spin_lock(&obd_zombie_impexp_lock);
1635 spin_unlock(&obd_zombie_impexp_lock);
1638 if (export != NULL) {
1639 class_export_destroy(export);
1640 spin_lock(&obd_zombie_impexp_lock);
1642 spin_unlock(&obd_zombie_impexp_lock);
1646 } while (import != NULL || export != NULL);
1650 static struct completion obd_zombie_start;
1651 static struct completion obd_zombie_stop;
1652 static unsigned long obd_zombie_flags;
1653 static wait_queue_head_t obd_zombie_waitq;
1654 static pid_t obd_zombie_pid;
1657 OBD_ZOMBIE_STOP = 0x0001,
1661 * check for work for kill zombie import/export thread.
1663 static int obd_zombie_impexp_check(void *arg)
1667 spin_lock(&obd_zombie_impexp_lock);
1668 rc = (zombies_count == 0) &&
1669 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1670 spin_unlock(&obd_zombie_impexp_lock);
1676 * Add export to the obd_zombe thread and notify it.
1678 static void obd_zombie_export_add(struct obd_export *exp) {
1679 atomic_dec(&obd_stale_export_num);
1680 spin_lock(&exp->exp_obd->obd_dev_lock);
1681 LASSERT(!list_empty(&exp->exp_obd_chain));
1682 list_del_init(&exp->exp_obd_chain);
1683 spin_unlock(&exp->exp_obd->obd_dev_lock);
1684 spin_lock(&obd_zombie_impexp_lock);
1686 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1687 spin_unlock(&obd_zombie_impexp_lock);
1689 obd_zombie_impexp_notify();
1693 * Add import to the obd_zombe thread and notify it.
1695 static void obd_zombie_import_add(struct obd_import *imp) {
1696 LASSERT(imp->imp_sec == NULL);
1697 spin_lock(&obd_zombie_impexp_lock);
1698 LASSERT(list_empty(&imp->imp_zombie_chain));
1700 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1701 spin_unlock(&obd_zombie_impexp_lock);
1703 obd_zombie_impexp_notify();
1707 * notify import/export destroy thread about new zombie.
1709 static void obd_zombie_impexp_notify(void)
1712 * Make sure obd_zomebie_impexp_thread get this notification.
1713 * It is possible this signal only get by obd_zombie_barrier, and
1714 * barrier gulps this notification and sleeps away and hangs ensues
1716 wake_up_all(&obd_zombie_waitq);
1720 * check whether obd_zombie is idle
1722 static int obd_zombie_is_idle(void)
1726 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1727 spin_lock(&obd_zombie_impexp_lock);
1728 rc = (zombies_count == 0);
1729 spin_unlock(&obd_zombie_impexp_lock);
1734 * wait when obd_zombie import/export queues become empty
1736 void obd_zombie_barrier(void)
1738 struct l_wait_info lwi = { 0 };
1740 if (obd_zombie_pid == current_pid())
1741 /* don't wait for myself */
1743 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1745 EXPORT_SYMBOL(obd_zombie_barrier);
1748 struct obd_export *obd_stale_export_get(void)
1750 struct obd_export *exp = NULL;
1753 spin_lock(&obd_stale_export_lock);
1754 if (!list_empty(&obd_stale_exports)) {
1755 exp = list_entry(obd_stale_exports.next,
1756 struct obd_export, exp_stale_list);
1757 list_del_init(&exp->exp_stale_list);
1759 spin_unlock(&obd_stale_export_lock);
1762 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1763 atomic_read(&obd_stale_export_num));
1767 EXPORT_SYMBOL(obd_stale_export_get);
1769 void obd_stale_export_put(struct obd_export *exp)
1773 LASSERT(list_empty(&exp->exp_stale_list));
1774 if (exp->exp_lock_hash &&
1775 atomic_read(&exp->exp_lock_hash->hs_count)) {
1776 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1777 atomic_read(&obd_stale_export_num));
1779 spin_lock_bh(&exp->exp_bl_list_lock);
1780 spin_lock(&obd_stale_export_lock);
1781 /* Add to the tail if there is no blocked locks,
1782 * to the head otherwise. */
1783 if (list_empty(&exp->exp_bl_list))
1784 list_add_tail(&exp->exp_stale_list,
1785 &obd_stale_exports);
1787 list_add(&exp->exp_stale_list,
1788 &obd_stale_exports);
1790 spin_unlock(&obd_stale_export_lock);
1791 spin_unlock_bh(&exp->exp_bl_list_lock);
1793 class_export_put(exp);
1797 EXPORT_SYMBOL(obd_stale_export_put);
1800 * Adjust the position of the export in the stale list,
1801 * i.e. move to the head of the list if is needed.
1803 void obd_stale_export_adjust(struct obd_export *exp)
1805 LASSERT(exp != NULL);
1806 spin_lock_bh(&exp->exp_bl_list_lock);
1807 spin_lock(&obd_stale_export_lock);
1809 if (!list_empty(&exp->exp_stale_list) &&
1810 !list_empty(&exp->exp_bl_list))
1811 list_move(&exp->exp_stale_list, &obd_stale_exports);
1813 spin_unlock(&obd_stale_export_lock);
1814 spin_unlock_bh(&exp->exp_bl_list_lock);
1816 EXPORT_SYMBOL(obd_stale_export_adjust);
1819 * destroy zombie export/import thread.
1821 static int obd_zombie_impexp_thread(void *unused)
1823 unshare_fs_struct();
1824 complete(&obd_zombie_start);
1826 obd_zombie_pid = current_pid();
1828 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1829 struct l_wait_info lwi = { 0 };
1831 l_wait_event(obd_zombie_waitq,
1832 !obd_zombie_impexp_check(NULL), &lwi);
1833 obd_zombie_impexp_cull();
1836 * Notify obd_zombie_barrier callers that queues
1839 wake_up(&obd_zombie_waitq);
1842 complete(&obd_zombie_stop);
1849 * start destroy zombie import/export thread
1851 int obd_zombie_impexp_init(void)
1853 struct task_struct *task;
1855 INIT_LIST_HEAD(&obd_zombie_imports);
1857 INIT_LIST_HEAD(&obd_zombie_exports);
1858 spin_lock_init(&obd_zombie_impexp_lock);
1859 init_completion(&obd_zombie_start);
1860 init_completion(&obd_zombie_stop);
1861 init_waitqueue_head(&obd_zombie_waitq);
1864 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1866 RETURN(PTR_ERR(task));
1868 wait_for_completion(&obd_zombie_start);
1872 * stop destroy zombie import/export thread
1874 void obd_zombie_impexp_stop(void)
1876 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1877 obd_zombie_impexp_notify();
1878 wait_for_completion(&obd_zombie_stop);
1881 /***** Kernel-userspace comm helpers *******/
1883 /* Get length of entire message, including header */
1884 int kuc_len(int payload_len)
1886 return sizeof(struct kuc_hdr) + payload_len;
1888 EXPORT_SYMBOL(kuc_len);
1890 /* Get a pointer to kuc header, given a ptr to the payload
1891 * @param p Pointer to payload area
1892 * @returns Pointer to kuc header
1894 struct kuc_hdr * kuc_ptr(void *p)
1896 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1897 LASSERT(lh->kuc_magic == KUC_MAGIC);
1900 EXPORT_SYMBOL(kuc_ptr);
1902 /* Test if payload is part of kuc message
1903 * @param p Pointer to payload area
1906 int kuc_ispayload(void *p)
1908 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1910 if (kh->kuc_magic == KUC_MAGIC)
1915 EXPORT_SYMBOL(kuc_ispayload);
1917 /* Alloc space for a message, and fill in header
1918 * @return Pointer to payload area
1920 void *kuc_alloc(int payload_len, int transport, int type)
1923 int len = kuc_len(payload_len);
1927 return ERR_PTR(-ENOMEM);
1929 lh->kuc_magic = KUC_MAGIC;
1930 lh->kuc_transport = transport;
1931 lh->kuc_msgtype = type;
1932 lh->kuc_msglen = len;
1934 return (void *)(lh + 1);
1936 EXPORT_SYMBOL(kuc_alloc);
1938 /* Takes pointer to payload area */
1939 inline void kuc_free(void *p, int payload_len)
1941 struct kuc_hdr *lh = kuc_ptr(p);
1942 OBD_FREE(lh, kuc_len(payload_len));
1944 EXPORT_SYMBOL(kuc_free);
1946 struct obd_request_slot_waiter {
1947 struct list_head orsw_entry;
1948 wait_queue_head_t orsw_waitq;
1952 static bool obd_request_slot_avail(struct client_obd *cli,
1953 struct obd_request_slot_waiter *orsw)
1957 spin_lock(&cli->cl_loi_list_lock);
1958 avail = !!list_empty(&orsw->orsw_entry);
1959 spin_unlock(&cli->cl_loi_list_lock);
1965 * For network flow control, the RPC sponsor needs to acquire a credit
1966 * before sending the RPC. The credits count for a connection is defined
1967 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1968 * the subsequent RPC sponsors need to wait until others released their
1969 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1971 int obd_get_request_slot(struct client_obd *cli)
1973 struct obd_request_slot_waiter orsw;
1974 struct l_wait_info lwi;
1977 spin_lock(&cli->cl_loi_list_lock);
1978 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1979 cli->cl_r_in_flight++;
1980 spin_unlock(&cli->cl_loi_list_lock);
1984 init_waitqueue_head(&orsw.orsw_waitq);
1985 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1986 orsw.orsw_signaled = false;
1987 spin_unlock(&cli->cl_loi_list_lock);
1989 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1990 rc = l_wait_event(orsw.orsw_waitq,
1991 obd_request_slot_avail(cli, &orsw) ||
1995 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1996 * freed but other (such as obd_put_request_slot) is using it. */
1997 spin_lock(&cli->cl_loi_list_lock);
1999 if (!orsw.orsw_signaled) {
2000 if (list_empty(&orsw.orsw_entry))
2001 cli->cl_r_in_flight--;
2003 list_del(&orsw.orsw_entry);
2007 if (orsw.orsw_signaled) {
2008 LASSERT(list_empty(&orsw.orsw_entry));
2012 spin_unlock(&cli->cl_loi_list_lock);
2016 EXPORT_SYMBOL(obd_get_request_slot);
2018 void obd_put_request_slot(struct client_obd *cli)
2020 struct obd_request_slot_waiter *orsw;
2022 spin_lock(&cli->cl_loi_list_lock);
2023 cli->cl_r_in_flight--;
2025 /* If there is free slot, wakeup the first waiter. */
2026 if (!list_empty(&cli->cl_loi_read_list) &&
2027 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2028 orsw = list_entry(cli->cl_loi_read_list.next,
2029 struct obd_request_slot_waiter, orsw_entry);
2030 list_del_init(&orsw->orsw_entry);
2031 cli->cl_r_in_flight++;
2032 wake_up(&orsw->orsw_waitq);
2034 spin_unlock(&cli->cl_loi_list_lock);
2036 EXPORT_SYMBOL(obd_put_request_slot);
2038 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2040 return cli->cl_max_rpcs_in_flight;
2042 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2044 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2046 struct obd_request_slot_waiter *orsw;
2053 if (max > OBD_MAX_RIF_MAX || max < 1)
2056 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2057 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2058 /* adjust max_mod_rpcs_in_flight to ensure it is always
2059 * strictly lower that max_rpcs_in_flight */
2061 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2062 "because it must be higher than "
2063 "max_mod_rpcs_in_flight value",
2064 cli->cl_import->imp_obd->obd_name);
2067 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2068 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2074 spin_lock(&cli->cl_loi_list_lock);
2075 old = cli->cl_max_rpcs_in_flight;
2076 cli->cl_max_rpcs_in_flight = max;
2079 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2080 for (i = 0; i < diff; i++) {
2081 if (list_empty(&cli->cl_loi_read_list))
2084 orsw = list_entry(cli->cl_loi_read_list.next,
2085 struct obd_request_slot_waiter, orsw_entry);
2086 list_del_init(&orsw->orsw_entry);
2087 cli->cl_r_in_flight++;
2088 wake_up(&orsw->orsw_waitq);
2090 spin_unlock(&cli->cl_loi_list_lock);
2094 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2096 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2098 return cli->cl_max_mod_rpcs_in_flight;
2100 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2102 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2104 struct obd_connect_data *ocd;
2108 if (max > OBD_MAX_RIF_MAX || max < 1)
2111 /* cannot exceed or equal max_rpcs_in_flight */
2112 if (max >= cli->cl_max_rpcs_in_flight) {
2113 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2114 "higher or equal to max_rpcs_in_flight value (%u)\n",
2115 cli->cl_import->imp_obd->obd_name,
2116 max, cli->cl_max_rpcs_in_flight);
2120 /* cannot exceed max modify RPCs in flight supported by the server */
2121 ocd = &cli->cl_import->imp_connect_data;
2122 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2123 maxmodrpcs = ocd->ocd_maxmodrpcs;
2126 if (max > maxmodrpcs) {
2127 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2128 "higher than max_mod_rpcs_per_client value (%hu) "
2129 "returned by the server at connection\n",
2130 cli->cl_import->imp_obd->obd_name,
2135 spin_lock(&cli->cl_mod_rpcs_lock);
2137 prev = cli->cl_max_mod_rpcs_in_flight;
2138 cli->cl_max_mod_rpcs_in_flight = max;
2140 /* wakeup waiters if limit has been increased */
2141 if (cli->cl_max_mod_rpcs_in_flight > prev)
2142 wake_up(&cli->cl_mod_rpcs_waitq);
2144 spin_unlock(&cli->cl_mod_rpcs_lock);
2148 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2151 #define pct(a, b) (b ? a * 100 / b : 0)
2152 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2153 struct seq_file *seq)
2156 unsigned long mod_tot = 0, mod_cum;
2159 do_gettimeofday(&now);
2161 spin_lock(&cli->cl_mod_rpcs_lock);
2163 seq_printf(seq, "snapshot_time: %lu.%lu (secs.usecs)\n",
2164 now.tv_sec, now.tv_usec);
2165 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2166 cli->cl_mod_rpcs_in_flight);
2168 seq_printf(seq, "\n\t\t\tmodify\n");
2169 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2171 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2174 for (i = 0; i < OBD_HIST_MAX; i++) {
2175 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2177 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2178 i, mod, pct(mod, mod_tot),
2179 pct(mod_cum, mod_tot));
2180 if (mod_cum == mod_tot)
2184 spin_unlock(&cli->cl_mod_rpcs_lock);
2188 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2192 /* The number of modify RPCs sent in parallel is limited
2193 * because the server has a finite number of slots per client to
2194 * store request result and ensure reply reconstruction when needed.
2195 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2196 * that takes into account server limit and cl_max_rpcs_in_flight
2198 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2199 * one close request is allowed above the maximum.
2201 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2206 /* A slot is available if
2207 * - number of modify RPCs in flight is less than the max
2208 * - it's a close RPC and no other close request is in flight
2210 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2211 (close_req && cli->cl_close_rpcs_in_flight == 0);
2216 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2221 spin_lock(&cli->cl_mod_rpcs_lock);
2222 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2223 spin_unlock(&cli->cl_mod_rpcs_lock);
2227 /* Get a modify RPC slot from the obd client @cli according
2228 * to the kind of operation @opc that is going to be sent
2229 * and the intent @it of the operation if it applies.
2230 * If the maximum number of modify RPCs in flight is reached
2231 * the thread is put to sleep.
2232 * Returns the tag to be set in the request message. Tag 0
2233 * is reserved for non-modifying requests.
2235 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2236 struct lookup_intent *it)
2238 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2239 bool close_req = false;
2242 /* read-only metadata RPCs don't consume a slot on MDT
2243 * for reply reconstruction
2245 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2246 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2249 if (opc == MDS_CLOSE)
2253 spin_lock(&cli->cl_mod_rpcs_lock);
2254 max = cli->cl_max_mod_rpcs_in_flight;
2255 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2256 /* there is a slot available */
2257 cli->cl_mod_rpcs_in_flight++;
2259 cli->cl_close_rpcs_in_flight++;
2260 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2261 cli->cl_mod_rpcs_in_flight);
2262 /* find a free tag */
2263 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2265 LASSERT(i < OBD_MAX_RIF_MAX);
2266 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2267 spin_unlock(&cli->cl_mod_rpcs_lock);
2268 /* tag 0 is reserved for non-modify RPCs */
2271 spin_unlock(&cli->cl_mod_rpcs_lock);
2273 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2274 "opc %u, max %hu\n",
2275 cli->cl_import->imp_obd->obd_name, opc, max);
2277 l_wait_event(cli->cl_mod_rpcs_waitq,
2278 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2281 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2283 /* Put a modify RPC slot from the obd client @cli according
2284 * to the kind of operation @opc that has been sent and the
2285 * intent @it of the operation if it applies.
2287 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2288 struct lookup_intent *it, __u16 tag)
2290 bool close_req = false;
2292 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2293 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2296 if (opc == MDS_CLOSE)
2299 spin_lock(&cli->cl_mod_rpcs_lock);
2300 cli->cl_mod_rpcs_in_flight--;
2302 cli->cl_close_rpcs_in_flight--;
2303 /* release the tag in the bitmap */
2304 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2305 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2306 spin_unlock(&cli->cl_mod_rpcs_lock);
2307 wake_up(&cli->cl_mod_rpcs_waitq);
2309 EXPORT_SYMBOL(obd_put_mod_rpc_slot);