4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_disk.h>
48 #include <lustre_kernelcomm.h>
50 spinlock_t obd_types_lock;
52 static struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 static struct kmem_cache *import_cachep;
57 static struct list_head obd_zombie_imports;
58 static struct list_head obd_zombie_exports;
59 static spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 struct list_head obd_stale_exports;
68 spinlock_t obd_stale_export_lock;
69 atomic_t obd_stale_export_num;
71 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
72 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
75 * support functions: we could use inter-module communication, but this
76 * is more portable to other OS's
78 static struct obd_device *obd_device_alloc(void)
80 struct obd_device *obd;
82 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
84 obd->obd_magic = OBD_DEVICE_MAGIC;
89 static void obd_device_free(struct obd_device *obd)
92 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
93 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
94 if (obd->obd_namespace != NULL) {
95 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
96 obd, obd->obd_namespace, obd->obd_force);
99 lu_ref_fini(&obd->obd_reference);
100 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
103 struct obd_type *class_search_type(const char *name)
105 struct list_head *tmp;
106 struct obd_type *type;
108 spin_lock(&obd_types_lock);
109 list_for_each(tmp, &obd_types) {
110 type = list_entry(tmp, struct obd_type, typ_chain);
111 if (strcmp(type->typ_name, name) == 0) {
112 spin_unlock(&obd_types_lock);
116 spin_unlock(&obd_types_lock);
119 EXPORT_SYMBOL(class_search_type);
121 struct obd_type *class_get_type(const char *name)
123 struct obd_type *type = class_search_type(name);
125 #ifdef HAVE_MODULE_LOADING_SUPPORT
127 const char *modname = name;
129 if (strcmp(modname, "obdfilter") == 0)
132 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
133 modname = LUSTRE_OSP_NAME;
135 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
136 modname = LUSTRE_MDT_NAME;
138 if (!request_module("%s", modname)) {
139 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
140 type = class_search_type(name);
142 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
148 spin_lock(&type->obd_type_lock);
150 try_module_get(type->typ_dt_ops->o_owner);
151 spin_unlock(&type->obd_type_lock);
156 void class_put_type(struct obd_type *type)
159 spin_lock(&type->obd_type_lock);
161 module_put(type->typ_dt_ops->o_owner);
162 spin_unlock(&type->obd_type_lock);
165 #define CLASS_MAX_NAME 1024
167 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
168 bool enable_proc, struct lprocfs_vars *vars,
169 const char *name, struct lu_device_type *ldt)
171 struct obd_type *type;
176 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
178 if (class_search_type(name)) {
179 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
184 OBD_ALLOC(type, sizeof(*type));
188 OBD_ALLOC_PTR(type->typ_dt_ops);
189 OBD_ALLOC_PTR(type->typ_md_ops);
190 OBD_ALLOC(type->typ_name, strlen(name) + 1);
192 if (type->typ_dt_ops == NULL ||
193 type->typ_md_ops == NULL ||
194 type->typ_name == NULL)
197 *(type->typ_dt_ops) = *dt_ops;
198 /* md_ops is optional */
200 *(type->typ_md_ops) = *md_ops;
201 strcpy(type->typ_name, name);
202 spin_lock_init(&type->obd_type_lock);
204 #ifdef CONFIG_PROC_FS
206 type->typ_procroot = lprocfs_register(type->typ_name,
209 if (IS_ERR(type->typ_procroot)) {
210 rc = PTR_ERR(type->typ_procroot);
211 type->typ_procroot = NULL;
218 rc = lu_device_type_init(ldt);
223 spin_lock(&obd_types_lock);
224 list_add(&type->typ_chain, &obd_types);
225 spin_unlock(&obd_types_lock);
230 if (type->typ_name != NULL) {
231 #ifdef CONFIG_PROC_FS
232 if (type->typ_procroot != NULL)
233 remove_proc_subtree(type->typ_name, proc_lustre_root);
235 OBD_FREE(type->typ_name, strlen(name) + 1);
237 if (type->typ_md_ops != NULL)
238 OBD_FREE_PTR(type->typ_md_ops);
239 if (type->typ_dt_ops != NULL)
240 OBD_FREE_PTR(type->typ_dt_ops);
241 OBD_FREE(type, sizeof(*type));
244 EXPORT_SYMBOL(class_register_type);
246 int class_unregister_type(const char *name)
248 struct obd_type *type = class_search_type(name);
252 CERROR("unknown obd type\n");
256 if (type->typ_refcnt) {
257 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
258 /* This is a bad situation, let's make the best of it */
259 /* Remove ops, but leave the name for debugging */
260 OBD_FREE_PTR(type->typ_dt_ops);
261 OBD_FREE_PTR(type->typ_md_ops);
265 /* we do not use type->typ_procroot as for compatibility purposes
266 * other modules can share names (i.e. lod can use lov entry). so
267 * we can't reference pointer as it can get invalided when another
268 * module removes the entry */
269 #ifdef CONFIG_PROC_FS
270 if (type->typ_procroot != NULL)
271 remove_proc_subtree(type->typ_name, proc_lustre_root);
272 if (type->typ_procsym != NULL)
273 lprocfs_remove(&type->typ_procsym);
276 lu_device_type_fini(type->typ_lu);
278 spin_lock(&obd_types_lock);
279 list_del(&type->typ_chain);
280 spin_unlock(&obd_types_lock);
281 OBD_FREE(type->typ_name, strlen(name) + 1);
282 if (type->typ_dt_ops != NULL)
283 OBD_FREE_PTR(type->typ_dt_ops);
284 if (type->typ_md_ops != NULL)
285 OBD_FREE_PTR(type->typ_md_ops);
286 OBD_FREE(type, sizeof(*type));
288 } /* class_unregister_type */
289 EXPORT_SYMBOL(class_unregister_type);
292 * Create a new obd device.
294 * Find an empty slot in ::obd_devs[], create a new obd device in it.
296 * \param[in] type_name obd device type string.
297 * \param[in] name obd device name.
299 * \retval NULL if create fails, otherwise return the obd device
302 struct obd_device *class_newdev(const char *type_name, const char *name)
304 struct obd_device *result = NULL;
305 struct obd_device *newdev;
306 struct obd_type *type = NULL;
308 int new_obd_minor = 0;
311 if (strlen(name) >= MAX_OBD_NAME) {
312 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
313 RETURN(ERR_PTR(-EINVAL));
316 type = class_get_type(type_name);
318 CERROR("OBD: unknown type: %s\n", type_name);
319 RETURN(ERR_PTR(-ENODEV));
322 newdev = obd_device_alloc();
324 GOTO(out_type, result = ERR_PTR(-ENOMEM));
326 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
328 write_lock(&obd_dev_lock);
329 for (i = 0; i < class_devno_max(); i++) {
330 struct obd_device *obd = class_num2obd(i);
332 if (obd && (strcmp(name, obd->obd_name) == 0)) {
333 CERROR("Device %s already exists at %d, won't add\n",
336 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
337 "%p obd_magic %08x != %08x\n", result,
338 result->obd_magic, OBD_DEVICE_MAGIC);
339 LASSERTF(result->obd_minor == new_obd_minor,
340 "%p obd_minor %d != %d\n", result,
341 result->obd_minor, new_obd_minor);
343 obd_devs[result->obd_minor] = NULL;
344 result->obd_name[0]='\0';
346 result = ERR_PTR(-EEXIST);
349 if (!result && !obd) {
351 result->obd_minor = i;
353 result->obd_type = type;
354 strncpy(result->obd_name, name,
355 sizeof(result->obd_name) - 1);
356 obd_devs[i] = result;
359 write_unlock(&obd_dev_lock);
361 if (result == NULL && i >= class_devno_max()) {
362 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
364 GOTO(out, result = ERR_PTR(-EOVERFLOW));
370 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
371 result->obd_name, result);
375 obd_device_free(newdev);
377 class_put_type(type);
381 void class_release_dev(struct obd_device *obd)
383 struct obd_type *obd_type = obd->obd_type;
385 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
386 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
387 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
388 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
389 LASSERT(obd_type != NULL);
391 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
392 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
394 write_lock(&obd_dev_lock);
395 obd_devs[obd->obd_minor] = NULL;
396 write_unlock(&obd_dev_lock);
397 obd_device_free(obd);
399 class_put_type(obd_type);
402 int class_name2dev(const char *name)
409 read_lock(&obd_dev_lock);
410 for (i = 0; i < class_devno_max(); i++) {
411 struct obd_device *obd = class_num2obd(i);
413 if (obd && strcmp(name, obd->obd_name) == 0) {
414 /* Make sure we finished attaching before we give
415 out any references */
416 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
417 if (obd->obd_attached) {
418 read_unlock(&obd_dev_lock);
424 read_unlock(&obd_dev_lock);
429 struct obd_device *class_name2obd(const char *name)
431 int dev = class_name2dev(name);
433 if (dev < 0 || dev > class_devno_max())
435 return class_num2obd(dev);
437 EXPORT_SYMBOL(class_name2obd);
439 int class_uuid2dev(struct obd_uuid *uuid)
443 read_lock(&obd_dev_lock);
444 for (i = 0; i < class_devno_max(); i++) {
445 struct obd_device *obd = class_num2obd(i);
447 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
448 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
449 read_unlock(&obd_dev_lock);
453 read_unlock(&obd_dev_lock);
458 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
460 int dev = class_uuid2dev(uuid);
463 return class_num2obd(dev);
465 EXPORT_SYMBOL(class_uuid2obd);
468 * Get obd device from ::obd_devs[]
470 * \param num [in] array index
472 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
473 * otherwise return the obd device there.
475 struct obd_device *class_num2obd(int num)
477 struct obd_device *obd = NULL;
479 if (num < class_devno_max()) {
484 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
485 "%p obd_magic %08x != %08x\n",
486 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
487 LASSERTF(obd->obd_minor == num,
488 "%p obd_minor %0d != %0d\n",
489 obd, obd->obd_minor, num);
496 * Get obd devices count. Device in any
498 * \retval obd device count
500 int get_devices_count(void)
502 int index, max_index = class_devno_max(), dev_count = 0;
504 read_lock(&obd_dev_lock);
505 for (index = 0; index <= max_index; index++) {
506 struct obd_device *obd = class_num2obd(index);
510 read_unlock(&obd_dev_lock);
514 EXPORT_SYMBOL(get_devices_count);
516 void class_obd_list(void)
521 read_lock(&obd_dev_lock);
522 for (i = 0; i < class_devno_max(); i++) {
523 struct obd_device *obd = class_num2obd(i);
527 if (obd->obd_stopping)
529 else if (obd->obd_set_up)
531 else if (obd->obd_attached)
535 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
536 i, status, obd->obd_type->typ_name,
537 obd->obd_name, obd->obd_uuid.uuid,
538 atomic_read(&obd->obd_refcount));
540 read_unlock(&obd_dev_lock);
544 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
545 specified, then only the client with that uuid is returned,
546 otherwise any client connected to the tgt is returned. */
547 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
548 const char * typ_name,
549 struct obd_uuid *grp_uuid)
553 read_lock(&obd_dev_lock);
554 for (i = 0; i < class_devno_max(); i++) {
555 struct obd_device *obd = class_num2obd(i);
559 if ((strncmp(obd->obd_type->typ_name, typ_name,
560 strlen(typ_name)) == 0)) {
561 if (obd_uuid_equals(tgt_uuid,
562 &obd->u.cli.cl_target_uuid) &&
563 ((grp_uuid)? obd_uuid_equals(grp_uuid,
564 &obd->obd_uuid) : 1)) {
565 read_unlock(&obd_dev_lock);
570 read_unlock(&obd_dev_lock);
574 EXPORT_SYMBOL(class_find_client_obd);
576 /* Iterate the obd_device list looking devices have grp_uuid. Start
577 searching at *next, and if a device is found, the next index to look
578 at is saved in *next. If next is NULL, then the first matching device
579 will always be returned. */
580 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
586 else if (*next >= 0 && *next < class_devno_max())
591 read_lock(&obd_dev_lock);
592 for (; i < class_devno_max(); i++) {
593 struct obd_device *obd = class_num2obd(i);
597 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
600 read_unlock(&obd_dev_lock);
604 read_unlock(&obd_dev_lock);
608 EXPORT_SYMBOL(class_devices_in_group);
611 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
612 * adjust sptlrpc settings accordingly.
614 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
616 struct obd_device *obd;
620 LASSERT(namelen > 0);
622 read_lock(&obd_dev_lock);
623 for (i = 0; i < class_devno_max(); i++) {
624 obd = class_num2obd(i);
626 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
629 /* only notify mdc, osc, osp, lwp, mdt, ost
630 * because only these have a -sptlrpc llog */
631 type = obd->obd_type->typ_name;
632 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
633 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
634 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
635 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
636 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
637 strcmp(type, LUSTRE_OST_NAME) != 0)
640 if (strncmp(obd->obd_name, fsname, namelen))
643 class_incref(obd, __FUNCTION__, obd);
644 read_unlock(&obd_dev_lock);
645 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
646 sizeof(KEY_SPTLRPC_CONF),
647 KEY_SPTLRPC_CONF, 0, NULL, NULL);
649 class_decref(obd, __FUNCTION__, obd);
650 read_lock(&obd_dev_lock);
652 read_unlock(&obd_dev_lock);
655 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
657 void obd_cleanup_caches(void)
660 if (obd_device_cachep) {
661 kmem_cache_destroy(obd_device_cachep);
662 obd_device_cachep = NULL;
665 kmem_cache_destroy(obdo_cachep);
669 kmem_cache_destroy(import_cachep);
670 import_cachep = NULL;
676 int obd_init_caches(void)
681 LASSERT(obd_device_cachep == NULL);
682 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
683 sizeof(struct obd_device),
685 if (!obd_device_cachep)
686 GOTO(out, rc = -ENOMEM);
688 LASSERT(obdo_cachep == NULL);
689 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
692 GOTO(out, rc = -ENOMEM);
694 LASSERT(import_cachep == NULL);
695 import_cachep = kmem_cache_create("ll_import_cache",
696 sizeof(struct obd_import),
699 GOTO(out, rc = -ENOMEM);
703 obd_cleanup_caches();
707 /* map connection to client */
708 struct obd_export *class_conn2export(struct lustre_handle *conn)
710 struct obd_export *export;
714 CDEBUG(D_CACHE, "looking for null handle\n");
718 if (conn->cookie == -1) { /* this means assign a new connection */
719 CDEBUG(D_CACHE, "want a new connection\n");
723 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
724 export = class_handle2object(conn->cookie, NULL);
727 EXPORT_SYMBOL(class_conn2export);
729 struct obd_device *class_exp2obd(struct obd_export *exp)
735 EXPORT_SYMBOL(class_exp2obd);
737 struct obd_device *class_conn2obd(struct lustre_handle *conn)
739 struct obd_export *export;
740 export = class_conn2export(conn);
742 struct obd_device *obd = export->exp_obd;
743 class_export_put(export);
749 struct obd_import *class_exp2cliimp(struct obd_export *exp)
751 struct obd_device *obd = exp->exp_obd;
754 return obd->u.cli.cl_import;
756 EXPORT_SYMBOL(class_exp2cliimp);
758 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
760 struct obd_device *obd = class_conn2obd(conn);
763 return obd->u.cli.cl_import;
766 /* Export management functions */
767 static void class_export_destroy(struct obd_export *exp)
769 struct obd_device *obd = exp->exp_obd;
772 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
773 LASSERT(obd != NULL);
775 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
776 exp->exp_client_uuid.uuid, obd->obd_name);
778 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
779 if (exp->exp_connection)
780 ptlrpc_put_connection_superhack(exp->exp_connection);
782 LASSERT(list_empty(&exp->exp_outstanding_replies));
783 LASSERT(list_empty(&exp->exp_uncommitted_replies));
784 LASSERT(list_empty(&exp->exp_req_replay_queue));
785 LASSERT(list_empty(&exp->exp_hp_rpcs));
786 obd_destroy_export(exp);
787 class_decref(obd, "export", exp);
789 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
793 static void export_handle_addref(void *export)
795 class_export_get(export);
798 static struct portals_handle_ops export_handle_ops = {
799 .hop_addref = export_handle_addref,
803 struct obd_export *class_export_get(struct obd_export *exp)
805 atomic_inc(&exp->exp_refcount);
806 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
807 atomic_read(&exp->exp_refcount));
810 EXPORT_SYMBOL(class_export_get);
812 void class_export_put(struct obd_export *exp)
814 LASSERT(exp != NULL);
815 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
816 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
817 atomic_read(&exp->exp_refcount) - 1);
819 if (atomic_dec_and_test(&exp->exp_refcount)) {
820 LASSERT(!list_empty(&exp->exp_obd_chain));
821 LASSERT(list_empty(&exp->exp_stale_list));
822 CDEBUG(D_IOCTL, "final put %p/%s\n",
823 exp, exp->exp_client_uuid.uuid);
825 /* release nid stat refererence */
826 lprocfs_exp_cleanup(exp);
828 obd_zombie_export_add(exp);
831 EXPORT_SYMBOL(class_export_put);
833 /* Creates a new export, adds it to the hash table, and returns a
834 * pointer to it. The refcount is 2: one for the hash reference, and
835 * one for the pointer returned by this function. */
836 struct obd_export *class_new_export(struct obd_device *obd,
837 struct obd_uuid *cluuid)
839 struct obd_export *export;
840 struct cfs_hash *hash = NULL;
844 OBD_ALLOC_PTR(export);
846 return ERR_PTR(-ENOMEM);
848 export->exp_conn_cnt = 0;
849 export->exp_lock_hash = NULL;
850 export->exp_flock_hash = NULL;
851 atomic_set(&export->exp_refcount, 2);
852 atomic_set(&export->exp_rpc_count, 0);
853 atomic_set(&export->exp_cb_count, 0);
854 atomic_set(&export->exp_locks_count, 0);
855 #if LUSTRE_TRACKS_LOCK_EXP_REFS
856 INIT_LIST_HEAD(&export->exp_locks_list);
857 spin_lock_init(&export->exp_locks_list_guard);
859 atomic_set(&export->exp_replay_count, 0);
860 export->exp_obd = obd;
861 INIT_LIST_HEAD(&export->exp_outstanding_replies);
862 spin_lock_init(&export->exp_uncommitted_replies_lock);
863 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
864 INIT_LIST_HEAD(&export->exp_req_replay_queue);
865 INIT_LIST_HEAD(&export->exp_handle.h_link);
866 INIT_LIST_HEAD(&export->exp_hp_rpcs);
867 INIT_LIST_HEAD(&export->exp_reg_rpcs);
868 class_handle_hash(&export->exp_handle, &export_handle_ops);
869 export->exp_last_request_time = cfs_time_current_sec();
870 spin_lock_init(&export->exp_lock);
871 spin_lock_init(&export->exp_rpc_lock);
872 INIT_HLIST_NODE(&export->exp_uuid_hash);
873 INIT_HLIST_NODE(&export->exp_nid_hash);
874 INIT_HLIST_NODE(&export->exp_gen_hash);
875 spin_lock_init(&export->exp_bl_list_lock);
876 INIT_LIST_HEAD(&export->exp_bl_list);
877 INIT_LIST_HEAD(&export->exp_stale_list);
879 export->exp_sp_peer = LUSTRE_SP_ANY;
880 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
881 export->exp_client_uuid = *cluuid;
882 obd_init_export(export);
884 spin_lock(&obd->obd_dev_lock);
885 /* shouldn't happen, but might race */
886 if (obd->obd_stopping)
887 GOTO(exit_unlock, rc = -ENODEV);
889 hash = cfs_hash_getref(obd->obd_uuid_hash);
891 GOTO(exit_unlock, rc = -ENODEV);
892 spin_unlock(&obd->obd_dev_lock);
894 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
895 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
897 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
898 obd->obd_name, cluuid->uuid, rc);
899 GOTO(exit_err, rc = -EALREADY);
903 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
904 spin_lock(&obd->obd_dev_lock);
905 if (obd->obd_stopping) {
906 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
907 GOTO(exit_unlock, rc = -ENODEV);
910 class_incref(obd, "export", export);
911 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
912 list_add_tail(&export->exp_obd_chain_timed,
913 &export->exp_obd->obd_exports_timed);
914 export->exp_obd->obd_num_exports++;
915 spin_unlock(&obd->obd_dev_lock);
916 cfs_hash_putref(hash);
920 spin_unlock(&obd->obd_dev_lock);
923 cfs_hash_putref(hash);
924 class_handle_unhash(&export->exp_handle);
925 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
926 obd_destroy_export(export);
927 OBD_FREE_PTR(export);
930 EXPORT_SYMBOL(class_new_export);
932 void class_unlink_export(struct obd_export *exp)
934 class_handle_unhash(&exp->exp_handle);
936 spin_lock(&exp->exp_obd->obd_dev_lock);
937 /* delete an uuid-export hashitem from hashtables */
938 if (!hlist_unhashed(&exp->exp_uuid_hash))
939 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
940 &exp->exp_client_uuid,
941 &exp->exp_uuid_hash);
943 if (!hlist_unhashed(&exp->exp_gen_hash)) {
944 struct tg_export_data *ted = &exp->exp_target_data;
945 struct cfs_hash *hash;
947 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
948 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
950 cfs_hash_putref(hash);
953 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
954 list_del_init(&exp->exp_obd_chain_timed);
955 exp->exp_obd->obd_num_exports--;
956 spin_unlock(&exp->exp_obd->obd_dev_lock);
957 atomic_inc(&obd_stale_export_num);
959 /* A reference is kept by obd_stale_exports list */
960 obd_stale_export_put(exp);
962 EXPORT_SYMBOL(class_unlink_export);
964 /* Import management functions */
965 static void class_import_destroy(struct obd_import *imp)
969 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
970 imp->imp_obd->obd_name);
972 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
974 ptlrpc_put_connection_superhack(imp->imp_connection);
976 while (!list_empty(&imp->imp_conn_list)) {
977 struct obd_import_conn *imp_conn;
979 imp_conn = list_entry(imp->imp_conn_list.next,
980 struct obd_import_conn, oic_item);
981 list_del_init(&imp_conn->oic_item);
982 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
983 OBD_FREE(imp_conn, sizeof(*imp_conn));
986 LASSERT(imp->imp_sec == NULL);
987 class_decref(imp->imp_obd, "import", imp);
988 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
992 static void import_handle_addref(void *import)
994 class_import_get(import);
997 static struct portals_handle_ops import_handle_ops = {
998 .hop_addref = import_handle_addref,
1002 struct obd_import *class_import_get(struct obd_import *import)
1004 atomic_inc(&import->imp_refcount);
1005 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1006 atomic_read(&import->imp_refcount),
1007 import->imp_obd->obd_name);
1010 EXPORT_SYMBOL(class_import_get);
1012 void class_import_put(struct obd_import *imp)
1016 LASSERT(list_empty(&imp->imp_zombie_chain));
1017 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1019 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1020 atomic_read(&imp->imp_refcount) - 1,
1021 imp->imp_obd->obd_name);
1023 if (atomic_dec_and_test(&imp->imp_refcount)) {
1024 CDEBUG(D_INFO, "final put import %p\n", imp);
1025 obd_zombie_import_add(imp);
1028 /* catch possible import put race */
1029 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1032 EXPORT_SYMBOL(class_import_put);
1034 static void init_imp_at(struct imp_at *at) {
1036 at_init(&at->iat_net_latency, 0, 0);
1037 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1038 /* max service estimates are tracked on the server side, so
1039 don't use the AT history here, just use the last reported
1040 val. (But keep hist for proc histogram, worst_ever) */
1041 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1046 struct obd_import *class_new_import(struct obd_device *obd)
1048 struct obd_import *imp;
1050 OBD_ALLOC(imp, sizeof(*imp));
1054 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1055 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1056 INIT_LIST_HEAD(&imp->imp_replay_list);
1057 INIT_LIST_HEAD(&imp->imp_sending_list);
1058 INIT_LIST_HEAD(&imp->imp_delayed_list);
1059 INIT_LIST_HEAD(&imp->imp_committed_list);
1060 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1061 imp->imp_known_replied_xid = 0;
1062 imp->imp_replay_cursor = &imp->imp_committed_list;
1063 spin_lock_init(&imp->imp_lock);
1064 imp->imp_last_success_conn = 0;
1065 imp->imp_state = LUSTRE_IMP_NEW;
1066 imp->imp_obd = class_incref(obd, "import", imp);
1067 mutex_init(&imp->imp_sec_mutex);
1068 init_waitqueue_head(&imp->imp_recovery_waitq);
1070 atomic_set(&imp->imp_refcount, 2);
1071 atomic_set(&imp->imp_unregistering, 0);
1072 atomic_set(&imp->imp_inflight, 0);
1073 atomic_set(&imp->imp_replay_inflight, 0);
1074 atomic_set(&imp->imp_inval_count, 0);
1075 INIT_LIST_HEAD(&imp->imp_conn_list);
1076 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1077 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1078 init_imp_at(&imp->imp_at);
1080 /* the default magic is V2, will be used in connect RPC, and
1081 * then adjusted according to the flags in request/reply. */
1082 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1086 EXPORT_SYMBOL(class_new_import);
1088 void class_destroy_import(struct obd_import *import)
1090 LASSERT(import != NULL);
1091 LASSERT(import != LP_POISON);
1093 class_handle_unhash(&import->imp_handle);
1095 spin_lock(&import->imp_lock);
1096 import->imp_generation++;
1097 spin_unlock(&import->imp_lock);
1098 class_import_put(import);
1100 EXPORT_SYMBOL(class_destroy_import);
1102 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1104 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1106 spin_lock(&exp->exp_locks_list_guard);
1108 LASSERT(lock->l_exp_refs_nr >= 0);
1110 if (lock->l_exp_refs_target != NULL &&
1111 lock->l_exp_refs_target != exp) {
1112 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1113 exp, lock, lock->l_exp_refs_target);
1115 if ((lock->l_exp_refs_nr ++) == 0) {
1116 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1117 lock->l_exp_refs_target = exp;
1119 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1120 lock, exp, lock->l_exp_refs_nr);
1121 spin_unlock(&exp->exp_locks_list_guard);
1124 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1126 spin_lock(&exp->exp_locks_list_guard);
1127 LASSERT(lock->l_exp_refs_nr > 0);
1128 if (lock->l_exp_refs_target != exp) {
1129 LCONSOLE_WARN("lock %p, "
1130 "mismatching export pointers: %p, %p\n",
1131 lock, lock->l_exp_refs_target, exp);
1133 if (-- lock->l_exp_refs_nr == 0) {
1134 list_del_init(&lock->l_exp_refs_link);
1135 lock->l_exp_refs_target = NULL;
1137 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1138 lock, exp, lock->l_exp_refs_nr);
1139 spin_unlock(&exp->exp_locks_list_guard);
1143 /* A connection defines an export context in which preallocation can
1144 be managed. This releases the export pointer reference, and returns
1145 the export handle, so the export refcount is 1 when this function
1147 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1148 struct obd_uuid *cluuid)
1150 struct obd_export *export;
1151 LASSERT(conn != NULL);
1152 LASSERT(obd != NULL);
1153 LASSERT(cluuid != NULL);
1156 export = class_new_export(obd, cluuid);
1158 RETURN(PTR_ERR(export));
1160 conn->cookie = export->exp_handle.h_cookie;
1161 class_export_put(export);
1163 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1164 cluuid->uuid, conn->cookie);
1167 EXPORT_SYMBOL(class_connect);
1169 /* if export is involved in recovery then clean up related things */
1170 static void class_export_recovery_cleanup(struct obd_export *exp)
1172 struct obd_device *obd = exp->exp_obd;
1174 spin_lock(&obd->obd_recovery_task_lock);
1175 if (obd->obd_recovering) {
1176 if (exp->exp_in_recovery) {
1177 spin_lock(&exp->exp_lock);
1178 exp->exp_in_recovery = 0;
1179 spin_unlock(&exp->exp_lock);
1180 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1181 atomic_dec(&obd->obd_connected_clients);
1184 /* if called during recovery then should update
1185 * obd_stale_clients counter,
1186 * lightweight exports are not counted */
1187 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1188 exp->exp_obd->obd_stale_clients++;
1190 spin_unlock(&obd->obd_recovery_task_lock);
1192 spin_lock(&exp->exp_lock);
1193 /** Cleanup req replay fields */
1194 if (exp->exp_req_replay_needed) {
1195 exp->exp_req_replay_needed = 0;
1197 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1198 atomic_dec(&obd->obd_req_replay_clients);
1201 /** Cleanup lock replay data */
1202 if (exp->exp_lock_replay_needed) {
1203 exp->exp_lock_replay_needed = 0;
1205 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1206 atomic_dec(&obd->obd_lock_replay_clients);
1208 spin_unlock(&exp->exp_lock);
1211 /* This function removes 1-3 references from the export:
1212 * 1 - for export pointer passed
1213 * and if disconnect really need
1214 * 2 - removing from hash
1215 * 3 - in client_unlink_export
1216 * The export pointer passed to this function can destroyed */
1217 int class_disconnect(struct obd_export *export)
1219 int already_disconnected;
1222 if (export == NULL) {
1223 CWARN("attempting to free NULL export %p\n", export);
1227 spin_lock(&export->exp_lock);
1228 already_disconnected = export->exp_disconnected;
1229 export->exp_disconnected = 1;
1230 spin_unlock(&export->exp_lock);
1232 /* class_cleanup(), abort_recovery(), and class_fail_export()
1233 * all end up in here, and if any of them race we shouldn't
1234 * call extra class_export_puts(). */
1235 if (already_disconnected) {
1236 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1237 GOTO(no_disconn, already_disconnected);
1240 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1241 export->exp_handle.h_cookie);
1243 if (!hlist_unhashed(&export->exp_nid_hash))
1244 cfs_hash_del(export->exp_obd->obd_nid_hash,
1245 &export->exp_connection->c_peer.nid,
1246 &export->exp_nid_hash);
1248 class_export_recovery_cleanup(export);
1249 class_unlink_export(export);
1251 class_export_put(export);
1254 EXPORT_SYMBOL(class_disconnect);
1256 /* Return non-zero for a fully connected export */
1257 int class_connected_export(struct obd_export *exp)
1262 spin_lock(&exp->exp_lock);
1263 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1264 spin_unlock(&exp->exp_lock);
1268 EXPORT_SYMBOL(class_connected_export);
1270 static void class_disconnect_export_list(struct list_head *list,
1271 enum obd_option flags)
1274 struct obd_export *exp;
1277 /* It's possible that an export may disconnect itself, but
1278 * nothing else will be added to this list. */
1279 while (!list_empty(list)) {
1280 exp = list_entry(list->next, struct obd_export,
1282 /* need for safe call CDEBUG after obd_disconnect */
1283 class_export_get(exp);
1285 spin_lock(&exp->exp_lock);
1286 exp->exp_flags = flags;
1287 spin_unlock(&exp->exp_lock);
1289 if (obd_uuid_equals(&exp->exp_client_uuid,
1290 &exp->exp_obd->obd_uuid)) {
1292 "exp %p export uuid == obd uuid, don't discon\n",
1294 /* Need to delete this now so we don't end up pointing
1295 * to work_list later when this export is cleaned up. */
1296 list_del_init(&exp->exp_obd_chain);
1297 class_export_put(exp);
1301 class_export_get(exp);
1302 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1303 "last request at "CFS_TIME_T"\n",
1304 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1305 exp, exp->exp_last_request_time);
1306 /* release one export reference anyway */
1307 rc = obd_disconnect(exp);
1309 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1310 obd_export_nid2str(exp), exp, rc);
1311 class_export_put(exp);
1316 void class_disconnect_exports(struct obd_device *obd)
1318 struct list_head work_list;
1321 /* Move all of the exports from obd_exports to a work list, en masse. */
1322 INIT_LIST_HEAD(&work_list);
1323 spin_lock(&obd->obd_dev_lock);
1324 list_splice_init(&obd->obd_exports, &work_list);
1325 list_splice_init(&obd->obd_delayed_exports, &work_list);
1326 spin_unlock(&obd->obd_dev_lock);
1328 if (!list_empty(&work_list)) {
1329 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1330 "disconnecting them\n", obd->obd_minor, obd);
1331 class_disconnect_export_list(&work_list,
1332 exp_flags_from_obd(obd));
1334 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1335 obd->obd_minor, obd);
1338 EXPORT_SYMBOL(class_disconnect_exports);
1340 /* Remove exports that have not completed recovery.
1342 void class_disconnect_stale_exports(struct obd_device *obd,
1343 int (*test_export)(struct obd_export *))
1345 struct list_head work_list;
1346 struct obd_export *exp, *n;
1350 INIT_LIST_HEAD(&work_list);
1351 spin_lock(&obd->obd_dev_lock);
1352 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1354 /* don't count self-export as client */
1355 if (obd_uuid_equals(&exp->exp_client_uuid,
1356 &exp->exp_obd->obd_uuid))
1359 /* don't evict clients which have no slot in last_rcvd
1360 * (e.g. lightweight connection) */
1361 if (exp->exp_target_data.ted_lr_idx == -1)
1364 spin_lock(&exp->exp_lock);
1365 if (exp->exp_failed || test_export(exp)) {
1366 spin_unlock(&exp->exp_lock);
1369 exp->exp_failed = 1;
1370 spin_unlock(&exp->exp_lock);
1372 list_move(&exp->exp_obd_chain, &work_list);
1374 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1375 obd->obd_name, exp->exp_client_uuid.uuid,
1376 exp->exp_connection == NULL ? "<unknown>" :
1377 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1378 print_export_data(exp, "EVICTING", 0);
1380 spin_unlock(&obd->obd_dev_lock);
1383 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1384 obd->obd_name, evicted);
1386 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1387 OBD_OPT_ABORT_RECOV);
1390 EXPORT_SYMBOL(class_disconnect_stale_exports);
1392 void class_fail_export(struct obd_export *exp)
1394 int rc, already_failed;
1396 spin_lock(&exp->exp_lock);
1397 already_failed = exp->exp_failed;
1398 exp->exp_failed = 1;
1399 spin_unlock(&exp->exp_lock);
1401 if (already_failed) {
1402 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1403 exp, exp->exp_client_uuid.uuid);
1407 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1408 exp, exp->exp_client_uuid.uuid);
1410 if (obd_dump_on_timeout)
1411 libcfs_debug_dumplog();
1413 /* need for safe call CDEBUG after obd_disconnect */
1414 class_export_get(exp);
1416 /* Most callers into obd_disconnect are removing their own reference
1417 * (request, for example) in addition to the one from the hash table.
1418 * We don't have such a reference here, so make one. */
1419 class_export_get(exp);
1420 rc = obd_disconnect(exp);
1422 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1424 CDEBUG(D_HA, "disconnected export %p/%s\n",
1425 exp, exp->exp_client_uuid.uuid);
1426 class_export_put(exp);
1428 EXPORT_SYMBOL(class_fail_export);
1430 char *obd_export_nid2str(struct obd_export *exp)
1432 if (exp->exp_connection != NULL)
1433 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1437 EXPORT_SYMBOL(obd_export_nid2str);
1439 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1441 struct cfs_hash *nid_hash;
1442 struct obd_export *doomed_exp = NULL;
1443 int exports_evicted = 0;
1445 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1447 spin_lock(&obd->obd_dev_lock);
1448 /* umount has run already, so evict thread should leave
1449 * its task to umount thread now */
1450 if (obd->obd_stopping) {
1451 spin_unlock(&obd->obd_dev_lock);
1452 return exports_evicted;
1454 nid_hash = obd->obd_nid_hash;
1455 cfs_hash_getref(nid_hash);
1456 spin_unlock(&obd->obd_dev_lock);
1459 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1460 if (doomed_exp == NULL)
1463 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1464 "nid %s found, wanted nid %s, requested nid %s\n",
1465 obd_export_nid2str(doomed_exp),
1466 libcfs_nid2str(nid_key), nid);
1467 LASSERTF(doomed_exp != obd->obd_self_export,
1468 "self-export is hashed by NID?\n");
1470 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1471 "request\n", obd->obd_name,
1472 obd_uuid2str(&doomed_exp->exp_client_uuid),
1473 obd_export_nid2str(doomed_exp));
1474 class_fail_export(doomed_exp);
1475 class_export_put(doomed_exp);
1478 cfs_hash_putref(nid_hash);
1480 if (!exports_evicted)
1481 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1482 obd->obd_name, nid);
1483 return exports_evicted;
1485 EXPORT_SYMBOL(obd_export_evict_by_nid);
1487 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1489 struct cfs_hash *uuid_hash;
1490 struct obd_export *doomed_exp = NULL;
1491 struct obd_uuid doomed_uuid;
1492 int exports_evicted = 0;
1494 spin_lock(&obd->obd_dev_lock);
1495 if (obd->obd_stopping) {
1496 spin_unlock(&obd->obd_dev_lock);
1497 return exports_evicted;
1499 uuid_hash = obd->obd_uuid_hash;
1500 cfs_hash_getref(uuid_hash);
1501 spin_unlock(&obd->obd_dev_lock);
1503 obd_str2uuid(&doomed_uuid, uuid);
1504 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1505 CERROR("%s: can't evict myself\n", obd->obd_name);
1506 cfs_hash_putref(uuid_hash);
1507 return exports_evicted;
1510 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1512 if (doomed_exp == NULL) {
1513 CERROR("%s: can't disconnect %s: no exports found\n",
1514 obd->obd_name, uuid);
1516 CWARN("%s: evicting %s at adminstrative request\n",
1517 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1518 class_fail_export(doomed_exp);
1519 class_export_put(doomed_exp);
1522 cfs_hash_putref(uuid_hash);
1524 return exports_evicted;
1527 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1528 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1531 static void print_export_data(struct obd_export *exp, const char *status,
1534 struct ptlrpc_reply_state *rs;
1535 struct ptlrpc_reply_state *first_reply = NULL;
1538 spin_lock(&exp->exp_lock);
1539 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1545 spin_unlock(&exp->exp_lock);
1547 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1548 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1549 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1550 atomic_read(&exp->exp_rpc_count),
1551 atomic_read(&exp->exp_cb_count),
1552 atomic_read(&exp->exp_locks_count),
1553 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1554 nreplies, first_reply, nreplies > 3 ? "..." : "",
1555 exp->exp_last_committed);
1556 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1557 if (locks && class_export_dump_hook != NULL)
1558 class_export_dump_hook(exp);
1562 void dump_exports(struct obd_device *obd, int locks)
1564 struct obd_export *exp;
1566 spin_lock(&obd->obd_dev_lock);
1567 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1568 print_export_data(exp, "ACTIVE", locks);
1569 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1570 print_export_data(exp, "UNLINKED", locks);
1571 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1572 print_export_data(exp, "DELAYED", locks);
1573 spin_unlock(&obd->obd_dev_lock);
1574 spin_lock(&obd_zombie_impexp_lock);
1575 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1576 print_export_data(exp, "ZOMBIE", locks);
1577 spin_unlock(&obd_zombie_impexp_lock);
1580 void obd_exports_barrier(struct obd_device *obd)
1583 LASSERT(list_empty(&obd->obd_exports));
1584 spin_lock(&obd->obd_dev_lock);
1585 while (!list_empty(&obd->obd_unlinked_exports)) {
1586 spin_unlock(&obd->obd_dev_lock);
1587 set_current_state(TASK_UNINTERRUPTIBLE);
1588 schedule_timeout(cfs_time_seconds(waited));
1589 if (waited > 5 && IS_PO2(waited)) {
1590 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1591 "more than %d seconds. "
1592 "The obd refcount = %d. Is it stuck?\n",
1593 obd->obd_name, waited,
1594 atomic_read(&obd->obd_refcount));
1595 dump_exports(obd, 1);
1598 spin_lock(&obd->obd_dev_lock);
1600 spin_unlock(&obd->obd_dev_lock);
1602 EXPORT_SYMBOL(obd_exports_barrier);
1604 /* Total amount of zombies to be destroyed */
1605 static int zombies_count = 0;
1608 * kill zombie imports and exports
1610 void obd_zombie_impexp_cull(void)
1612 struct obd_import *import;
1613 struct obd_export *export;
1617 spin_lock(&obd_zombie_impexp_lock);
1620 if (!list_empty(&obd_zombie_imports)) {
1621 import = list_entry(obd_zombie_imports.next,
1624 list_del_init(&import->imp_zombie_chain);
1628 if (!list_empty(&obd_zombie_exports)) {
1629 export = list_entry(obd_zombie_exports.next,
1632 list_del_init(&export->exp_obd_chain);
1635 spin_unlock(&obd_zombie_impexp_lock);
1637 if (import != NULL) {
1638 class_import_destroy(import);
1639 spin_lock(&obd_zombie_impexp_lock);
1641 spin_unlock(&obd_zombie_impexp_lock);
1644 if (export != NULL) {
1645 class_export_destroy(export);
1646 spin_lock(&obd_zombie_impexp_lock);
1648 spin_unlock(&obd_zombie_impexp_lock);
1652 } while (import != NULL || export != NULL);
1656 static struct completion obd_zombie_start;
1657 static struct completion obd_zombie_stop;
1658 static unsigned long obd_zombie_flags;
1659 static wait_queue_head_t obd_zombie_waitq;
1660 static pid_t obd_zombie_pid;
1663 OBD_ZOMBIE_STOP = 0x0001,
1667 * check for work for kill zombie import/export thread.
1669 static int obd_zombie_impexp_check(void *arg)
1673 spin_lock(&obd_zombie_impexp_lock);
1674 rc = (zombies_count == 0) &&
1675 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1676 spin_unlock(&obd_zombie_impexp_lock);
1682 * Add export to the obd_zombe thread and notify it.
1684 static void obd_zombie_export_add(struct obd_export *exp) {
1685 atomic_dec(&obd_stale_export_num);
1686 spin_lock(&exp->exp_obd->obd_dev_lock);
1687 LASSERT(!list_empty(&exp->exp_obd_chain));
1688 list_del_init(&exp->exp_obd_chain);
1689 spin_unlock(&exp->exp_obd->obd_dev_lock);
1690 spin_lock(&obd_zombie_impexp_lock);
1692 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1693 spin_unlock(&obd_zombie_impexp_lock);
1695 obd_zombie_impexp_notify();
1699 * Add import to the obd_zombe thread and notify it.
1701 static void obd_zombie_import_add(struct obd_import *imp) {
1702 LASSERT(imp->imp_sec == NULL);
1703 spin_lock(&obd_zombie_impexp_lock);
1704 LASSERT(list_empty(&imp->imp_zombie_chain));
1706 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1707 spin_unlock(&obd_zombie_impexp_lock);
1709 obd_zombie_impexp_notify();
1713 * notify import/export destroy thread about new zombie.
1715 static void obd_zombie_impexp_notify(void)
1718 * Make sure obd_zomebie_impexp_thread get this notification.
1719 * It is possible this signal only get by obd_zombie_barrier, and
1720 * barrier gulps this notification and sleeps away and hangs ensues
1722 wake_up_all(&obd_zombie_waitq);
1726 * check whether obd_zombie is idle
1728 static int obd_zombie_is_idle(void)
1732 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1733 spin_lock(&obd_zombie_impexp_lock);
1734 rc = (zombies_count == 0);
1735 spin_unlock(&obd_zombie_impexp_lock);
1740 * wait when obd_zombie import/export queues become empty
1742 void obd_zombie_barrier(void)
1744 struct l_wait_info lwi = { 0 };
1746 if (obd_zombie_pid == current_pid())
1747 /* don't wait for myself */
1749 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1751 EXPORT_SYMBOL(obd_zombie_barrier);
1754 struct obd_export *obd_stale_export_get(void)
1756 struct obd_export *exp = NULL;
1759 spin_lock(&obd_stale_export_lock);
1760 if (!list_empty(&obd_stale_exports)) {
1761 exp = list_entry(obd_stale_exports.next,
1762 struct obd_export, exp_stale_list);
1763 list_del_init(&exp->exp_stale_list);
1765 spin_unlock(&obd_stale_export_lock);
1768 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1769 atomic_read(&obd_stale_export_num));
1773 EXPORT_SYMBOL(obd_stale_export_get);
1775 void obd_stale_export_put(struct obd_export *exp)
1779 LASSERT(list_empty(&exp->exp_stale_list));
1780 if (exp->exp_lock_hash &&
1781 atomic_read(&exp->exp_lock_hash->hs_count)) {
1782 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1783 atomic_read(&obd_stale_export_num));
1785 spin_lock_bh(&exp->exp_bl_list_lock);
1786 spin_lock(&obd_stale_export_lock);
1787 /* Add to the tail if there is no blocked locks,
1788 * to the head otherwise. */
1789 if (list_empty(&exp->exp_bl_list))
1790 list_add_tail(&exp->exp_stale_list,
1791 &obd_stale_exports);
1793 list_add(&exp->exp_stale_list,
1794 &obd_stale_exports);
1796 spin_unlock(&obd_stale_export_lock);
1797 spin_unlock_bh(&exp->exp_bl_list_lock);
1799 class_export_put(exp);
1803 EXPORT_SYMBOL(obd_stale_export_put);
1806 * Adjust the position of the export in the stale list,
1807 * i.e. move to the head of the list if is needed.
1809 void obd_stale_export_adjust(struct obd_export *exp)
1811 LASSERT(exp != NULL);
1812 spin_lock_bh(&exp->exp_bl_list_lock);
1813 spin_lock(&obd_stale_export_lock);
1815 if (!list_empty(&exp->exp_stale_list) &&
1816 !list_empty(&exp->exp_bl_list))
1817 list_move(&exp->exp_stale_list, &obd_stale_exports);
1819 spin_unlock(&obd_stale_export_lock);
1820 spin_unlock_bh(&exp->exp_bl_list_lock);
1822 EXPORT_SYMBOL(obd_stale_export_adjust);
1825 * destroy zombie export/import thread.
1827 static int obd_zombie_impexp_thread(void *unused)
1829 unshare_fs_struct();
1830 complete(&obd_zombie_start);
1832 obd_zombie_pid = current_pid();
1834 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1835 struct l_wait_info lwi = { 0 };
1837 l_wait_event(obd_zombie_waitq,
1838 !obd_zombie_impexp_check(NULL), &lwi);
1839 obd_zombie_impexp_cull();
1842 * Notify obd_zombie_barrier callers that queues
1845 wake_up(&obd_zombie_waitq);
1848 complete(&obd_zombie_stop);
1855 * start destroy zombie import/export thread
1857 int obd_zombie_impexp_init(void)
1859 struct task_struct *task;
1861 INIT_LIST_HEAD(&obd_zombie_imports);
1863 INIT_LIST_HEAD(&obd_zombie_exports);
1864 spin_lock_init(&obd_zombie_impexp_lock);
1865 init_completion(&obd_zombie_start);
1866 init_completion(&obd_zombie_stop);
1867 init_waitqueue_head(&obd_zombie_waitq);
1870 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1872 RETURN(PTR_ERR(task));
1874 wait_for_completion(&obd_zombie_start);
1878 * stop destroy zombie import/export thread
1880 void obd_zombie_impexp_stop(void)
1882 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1883 obd_zombie_impexp_notify();
1884 wait_for_completion(&obd_zombie_stop);
1887 /***** Kernel-userspace comm helpers *******/
1889 /* Get length of entire message, including header */
1890 int kuc_len(int payload_len)
1892 return sizeof(struct kuc_hdr) + payload_len;
1894 EXPORT_SYMBOL(kuc_len);
1896 /* Get a pointer to kuc header, given a ptr to the payload
1897 * @param p Pointer to payload area
1898 * @returns Pointer to kuc header
1900 struct kuc_hdr * kuc_ptr(void *p)
1902 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1903 LASSERT(lh->kuc_magic == KUC_MAGIC);
1906 EXPORT_SYMBOL(kuc_ptr);
1908 /* Test if payload is part of kuc message
1909 * @param p Pointer to payload area
1912 int kuc_ispayload(void *p)
1914 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1916 if (kh->kuc_magic == KUC_MAGIC)
1921 EXPORT_SYMBOL(kuc_ispayload);
1923 /* Alloc space for a message, and fill in header
1924 * @return Pointer to payload area
1926 void *kuc_alloc(int payload_len, int transport, int type)
1929 int len = kuc_len(payload_len);
1933 return ERR_PTR(-ENOMEM);
1935 lh->kuc_magic = KUC_MAGIC;
1936 lh->kuc_transport = transport;
1937 lh->kuc_msgtype = type;
1938 lh->kuc_msglen = len;
1940 return (void *)(lh + 1);
1942 EXPORT_SYMBOL(kuc_alloc);
1944 /* Takes pointer to payload area */
1945 inline void kuc_free(void *p, int payload_len)
1947 struct kuc_hdr *lh = kuc_ptr(p);
1948 OBD_FREE(lh, kuc_len(payload_len));
1950 EXPORT_SYMBOL(kuc_free);
1952 struct obd_request_slot_waiter {
1953 struct list_head orsw_entry;
1954 wait_queue_head_t orsw_waitq;
1958 static bool obd_request_slot_avail(struct client_obd *cli,
1959 struct obd_request_slot_waiter *orsw)
1963 spin_lock(&cli->cl_loi_list_lock);
1964 avail = !!list_empty(&orsw->orsw_entry);
1965 spin_unlock(&cli->cl_loi_list_lock);
1971 * For network flow control, the RPC sponsor needs to acquire a credit
1972 * before sending the RPC. The credits count for a connection is defined
1973 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1974 * the subsequent RPC sponsors need to wait until others released their
1975 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1977 int obd_get_request_slot(struct client_obd *cli)
1979 struct obd_request_slot_waiter orsw;
1980 struct l_wait_info lwi;
1983 spin_lock(&cli->cl_loi_list_lock);
1984 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1985 cli->cl_r_in_flight++;
1986 spin_unlock(&cli->cl_loi_list_lock);
1990 init_waitqueue_head(&orsw.orsw_waitq);
1991 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1992 orsw.orsw_signaled = false;
1993 spin_unlock(&cli->cl_loi_list_lock);
1995 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1996 rc = l_wait_event(orsw.orsw_waitq,
1997 obd_request_slot_avail(cli, &orsw) ||
2001 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
2002 * freed but other (such as obd_put_request_slot) is using it. */
2003 spin_lock(&cli->cl_loi_list_lock);
2005 if (!orsw.orsw_signaled) {
2006 if (list_empty(&orsw.orsw_entry))
2007 cli->cl_r_in_flight--;
2009 list_del(&orsw.orsw_entry);
2013 if (orsw.orsw_signaled) {
2014 LASSERT(list_empty(&orsw.orsw_entry));
2018 spin_unlock(&cli->cl_loi_list_lock);
2022 EXPORT_SYMBOL(obd_get_request_slot);
2024 void obd_put_request_slot(struct client_obd *cli)
2026 struct obd_request_slot_waiter *orsw;
2028 spin_lock(&cli->cl_loi_list_lock);
2029 cli->cl_r_in_flight--;
2031 /* If there is free slot, wakeup the first waiter. */
2032 if (!list_empty(&cli->cl_loi_read_list) &&
2033 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2034 orsw = list_entry(cli->cl_loi_read_list.next,
2035 struct obd_request_slot_waiter, orsw_entry);
2036 list_del_init(&orsw->orsw_entry);
2037 cli->cl_r_in_flight++;
2038 wake_up(&orsw->orsw_waitq);
2040 spin_unlock(&cli->cl_loi_list_lock);
2042 EXPORT_SYMBOL(obd_put_request_slot);
2044 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2046 return cli->cl_max_rpcs_in_flight;
2048 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2050 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2052 struct obd_request_slot_waiter *orsw;
2059 if (max > OBD_MAX_RIF_MAX || max < 1)
2062 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2063 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2064 /* adjust max_mod_rpcs_in_flight to ensure it is always
2065 * strictly lower that max_rpcs_in_flight */
2067 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2068 "because it must be higher than "
2069 "max_mod_rpcs_in_flight value",
2070 cli->cl_import->imp_obd->obd_name);
2073 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2074 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2080 spin_lock(&cli->cl_loi_list_lock);
2081 old = cli->cl_max_rpcs_in_flight;
2082 cli->cl_max_rpcs_in_flight = max;
2085 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2086 for (i = 0; i < diff; i++) {
2087 if (list_empty(&cli->cl_loi_read_list))
2090 orsw = list_entry(cli->cl_loi_read_list.next,
2091 struct obd_request_slot_waiter, orsw_entry);
2092 list_del_init(&orsw->orsw_entry);
2093 cli->cl_r_in_flight++;
2094 wake_up(&orsw->orsw_waitq);
2096 spin_unlock(&cli->cl_loi_list_lock);
2100 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2102 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2104 return cli->cl_max_mod_rpcs_in_flight;
2106 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2108 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2110 struct obd_connect_data *ocd;
2114 if (max > OBD_MAX_RIF_MAX || max < 1)
2117 /* cannot exceed or equal max_rpcs_in_flight */
2118 if (max >= cli->cl_max_rpcs_in_flight) {
2119 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2120 "higher or equal to max_rpcs_in_flight value (%u)\n",
2121 cli->cl_import->imp_obd->obd_name,
2122 max, cli->cl_max_rpcs_in_flight);
2126 /* cannot exceed max modify RPCs in flight supported by the server */
2127 ocd = &cli->cl_import->imp_connect_data;
2128 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2129 maxmodrpcs = ocd->ocd_maxmodrpcs;
2132 if (max > maxmodrpcs) {
2133 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2134 "higher than max_mod_rpcs_per_client value (%hu) "
2135 "returned by the server at connection\n",
2136 cli->cl_import->imp_obd->obd_name,
2141 spin_lock(&cli->cl_mod_rpcs_lock);
2143 prev = cli->cl_max_mod_rpcs_in_flight;
2144 cli->cl_max_mod_rpcs_in_flight = max;
2146 /* wakeup waiters if limit has been increased */
2147 if (cli->cl_max_mod_rpcs_in_flight > prev)
2148 wake_up(&cli->cl_mod_rpcs_waitq);
2150 spin_unlock(&cli->cl_mod_rpcs_lock);
2154 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2157 #define pct(a, b) (b ? a * 100 / b : 0)
2158 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2159 struct seq_file *seq)
2162 unsigned long mod_tot = 0, mod_cum;
2165 do_gettimeofday(&now);
2167 spin_lock(&cli->cl_mod_rpcs_lock);
2169 seq_printf(seq, "snapshot_time: %lu.%lu (secs.usecs)\n",
2170 now.tv_sec, now.tv_usec);
2171 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2172 cli->cl_mod_rpcs_in_flight);
2174 seq_printf(seq, "\n\t\t\tmodify\n");
2175 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2177 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2180 for (i = 0; i < OBD_HIST_MAX; i++) {
2181 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2183 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2184 i, mod, pct(mod, mod_tot),
2185 pct(mod_cum, mod_tot));
2186 if (mod_cum == mod_tot)
2190 spin_unlock(&cli->cl_mod_rpcs_lock);
2194 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2198 /* The number of modify RPCs sent in parallel is limited
2199 * because the server has a finite number of slots per client to
2200 * store request result and ensure reply reconstruction when needed.
2201 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2202 * that takes into account server limit and cl_max_rpcs_in_flight
2204 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2205 * one close request is allowed above the maximum.
2207 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2212 /* A slot is available if
2213 * - number of modify RPCs in flight is less than the max
2214 * - it's a close RPC and no other close request is in flight
2216 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2217 (close_req && cli->cl_close_rpcs_in_flight == 0);
2222 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2227 spin_lock(&cli->cl_mod_rpcs_lock);
2228 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2229 spin_unlock(&cli->cl_mod_rpcs_lock);
2233 /* Get a modify RPC slot from the obd client @cli according
2234 * to the kind of operation @opc that is going to be sent
2235 * and the intent @it of the operation if it applies.
2236 * If the maximum number of modify RPCs in flight is reached
2237 * the thread is put to sleep.
2238 * Returns the tag to be set in the request message. Tag 0
2239 * is reserved for non-modifying requests.
2241 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2242 struct lookup_intent *it)
2244 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2245 bool close_req = false;
2248 /* read-only metadata RPCs don't consume a slot on MDT
2249 * for reply reconstruction
2251 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2252 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2255 if (opc == MDS_CLOSE)
2259 spin_lock(&cli->cl_mod_rpcs_lock);
2260 max = cli->cl_max_mod_rpcs_in_flight;
2261 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2262 /* there is a slot available */
2263 cli->cl_mod_rpcs_in_flight++;
2265 cli->cl_close_rpcs_in_flight++;
2266 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2267 cli->cl_mod_rpcs_in_flight);
2268 /* find a free tag */
2269 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2271 LASSERT(i < OBD_MAX_RIF_MAX);
2272 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2273 spin_unlock(&cli->cl_mod_rpcs_lock);
2274 /* tag 0 is reserved for non-modify RPCs */
2277 spin_unlock(&cli->cl_mod_rpcs_lock);
2279 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2280 "opc %u, max %hu\n",
2281 cli->cl_import->imp_obd->obd_name, opc, max);
2283 l_wait_event(cli->cl_mod_rpcs_waitq,
2284 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2287 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2289 /* Put a modify RPC slot from the obd client @cli according
2290 * to the kind of operation @opc that has been sent and the
2291 * intent @it of the operation if it applies.
2293 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2294 struct lookup_intent *it, __u16 tag)
2296 bool close_req = false;
2298 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2299 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2302 if (opc == MDS_CLOSE)
2305 spin_lock(&cli->cl_mod_rpcs_lock);
2306 cli->cl_mod_rpcs_in_flight--;
2308 cli->cl_close_rpcs_in_flight--;
2309 /* release the tag in the bitmap */
2310 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2311 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2312 spin_unlock(&cli->cl_mod_rpcs_lock);
2313 wake_up(&cli->cl_mod_rpcs_waitq);
2315 EXPORT_SYMBOL(obd_put_mod_rpc_slot);