4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2015, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/kthread.h>
41 #include <obd_class.h>
42 #include <lprocfs_status.h>
43 #include <lustre_disk.h>
44 #include <lustre_kernelcomm.h>
46 spinlock_t obd_types_lock;
48 static struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
53 static struct list_head obd_zombie_imports;
54 static struct list_head obd_zombie_exports;
55 static spinlock_t obd_zombie_impexp_lock;
57 static void obd_zombie_impexp_notify(void);
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61 const char *status, int locks, int debug_level);
63 struct list_head obd_stale_exports;
64 spinlock_t obd_stale_export_lock;
65 atomic_t obd_stale_export_num;
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
101 struct list_head *tmp;
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 list_for_each(tmp, &obd_types) {
106 type = list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
152 void class_put_type(struct obd_type *type)
155 spin_lock(&type->obd_type_lock);
157 module_put(type->typ_dt_ops->o_owner);
158 spin_unlock(&type->obd_type_lock);
161 #define CLASS_MAX_NAME 1024
163 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
164 bool enable_proc, struct lprocfs_vars *vars,
165 const char *name, struct lu_device_type *ldt)
167 struct obd_type *type;
172 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
174 if (class_search_type(name)) {
175 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
180 OBD_ALLOC(type, sizeof(*type));
184 OBD_ALLOC_PTR(type->typ_dt_ops);
185 OBD_ALLOC_PTR(type->typ_md_ops);
186 OBD_ALLOC(type->typ_name, strlen(name) + 1);
188 if (type->typ_dt_ops == NULL ||
189 type->typ_md_ops == NULL ||
190 type->typ_name == NULL)
193 *(type->typ_dt_ops) = *dt_ops;
194 /* md_ops is optional */
196 *(type->typ_md_ops) = *md_ops;
197 strcpy(type->typ_name, name);
198 spin_lock_init(&type->obd_type_lock);
200 #ifdef CONFIG_PROC_FS
202 type->typ_procroot = lprocfs_register(type->typ_name,
205 if (IS_ERR(type->typ_procroot)) {
206 rc = PTR_ERR(type->typ_procroot);
207 type->typ_procroot = NULL;
214 rc = lu_device_type_init(ldt);
219 spin_lock(&obd_types_lock);
220 list_add(&type->typ_chain, &obd_types);
221 spin_unlock(&obd_types_lock);
226 if (type->typ_name != NULL) {
227 #ifdef CONFIG_PROC_FS
228 if (type->typ_procroot != NULL)
229 remove_proc_subtree(type->typ_name, proc_lustre_root);
231 OBD_FREE(type->typ_name, strlen(name) + 1);
233 if (type->typ_md_ops != NULL)
234 OBD_FREE_PTR(type->typ_md_ops);
235 if (type->typ_dt_ops != NULL)
236 OBD_FREE_PTR(type->typ_dt_ops);
237 OBD_FREE(type, sizeof(*type));
240 EXPORT_SYMBOL(class_register_type);
242 int class_unregister_type(const char *name)
244 struct obd_type *type = class_search_type(name);
248 CERROR("unknown obd type\n");
252 if (type->typ_refcnt) {
253 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
254 /* This is a bad situation, let's make the best of it */
255 /* Remove ops, but leave the name for debugging */
256 OBD_FREE_PTR(type->typ_dt_ops);
257 OBD_FREE_PTR(type->typ_md_ops);
261 /* we do not use type->typ_procroot as for compatibility purposes
262 * other modules can share names (i.e. lod can use lov entry). so
263 * we can't reference pointer as it can get invalided when another
264 * module removes the entry */
265 #ifdef CONFIG_PROC_FS
266 if (type->typ_procroot != NULL)
267 remove_proc_subtree(type->typ_name, proc_lustre_root);
268 if (type->typ_procsym != NULL)
269 lprocfs_remove(&type->typ_procsym);
272 lu_device_type_fini(type->typ_lu);
274 spin_lock(&obd_types_lock);
275 list_del(&type->typ_chain);
276 spin_unlock(&obd_types_lock);
277 OBD_FREE(type->typ_name, strlen(name) + 1);
278 if (type->typ_dt_ops != NULL)
279 OBD_FREE_PTR(type->typ_dt_ops);
280 if (type->typ_md_ops != NULL)
281 OBD_FREE_PTR(type->typ_md_ops);
282 OBD_FREE(type, sizeof(*type));
284 } /* class_unregister_type */
285 EXPORT_SYMBOL(class_unregister_type);
288 * Create a new obd device.
290 * Find an empty slot in ::obd_devs[], create a new obd device in it.
292 * \param[in] type_name obd device type string.
293 * \param[in] name obd device name.
295 * \retval NULL if create fails, otherwise return the obd device
298 struct obd_device *class_newdev(const char *type_name, const char *name)
300 struct obd_device *result = NULL;
301 struct obd_device *newdev;
302 struct obd_type *type = NULL;
304 int new_obd_minor = 0;
307 if (strlen(name) >= MAX_OBD_NAME) {
308 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
309 RETURN(ERR_PTR(-EINVAL));
312 type = class_get_type(type_name);
314 CERROR("OBD: unknown type: %s\n", type_name);
315 RETURN(ERR_PTR(-ENODEV));
318 newdev = obd_device_alloc();
320 GOTO(out_type, result = ERR_PTR(-ENOMEM));
322 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
324 write_lock(&obd_dev_lock);
325 for (i = 0; i < class_devno_max(); i++) {
326 struct obd_device *obd = class_num2obd(i);
328 if (obd && (strcmp(name, obd->obd_name) == 0)) {
329 CERROR("Device %s already exists at %d, won't add\n",
332 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
333 "%p obd_magic %08x != %08x\n", result,
334 result->obd_magic, OBD_DEVICE_MAGIC);
335 LASSERTF(result->obd_minor == new_obd_minor,
336 "%p obd_minor %d != %d\n", result,
337 result->obd_minor, new_obd_minor);
339 obd_devs[result->obd_minor] = NULL;
340 result->obd_name[0]='\0';
342 result = ERR_PTR(-EEXIST);
345 if (!result && !obd) {
347 result->obd_minor = i;
349 result->obd_type = type;
350 strncpy(result->obd_name, name,
351 sizeof(result->obd_name) - 1);
352 obd_devs[i] = result;
355 write_unlock(&obd_dev_lock);
357 if (result == NULL && i >= class_devno_max()) {
358 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
360 GOTO(out, result = ERR_PTR(-EOVERFLOW));
366 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
367 result->obd_name, result);
371 obd_device_free(newdev);
373 class_put_type(type);
377 void class_release_dev(struct obd_device *obd)
379 struct obd_type *obd_type = obd->obd_type;
381 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
382 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
383 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
384 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
385 LASSERT(obd_type != NULL);
387 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
388 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
390 write_lock(&obd_dev_lock);
391 obd_devs[obd->obd_minor] = NULL;
392 write_unlock(&obd_dev_lock);
393 obd_device_free(obd);
395 class_put_type(obd_type);
398 int class_name2dev(const char *name)
405 read_lock(&obd_dev_lock);
406 for (i = 0; i < class_devno_max(); i++) {
407 struct obd_device *obd = class_num2obd(i);
409 if (obd && strcmp(name, obd->obd_name) == 0) {
410 /* Make sure we finished attaching before we give
411 out any references */
412 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
413 if (obd->obd_attached) {
414 read_unlock(&obd_dev_lock);
420 read_unlock(&obd_dev_lock);
425 struct obd_device *class_name2obd(const char *name)
427 int dev = class_name2dev(name);
429 if (dev < 0 || dev > class_devno_max())
431 return class_num2obd(dev);
433 EXPORT_SYMBOL(class_name2obd);
435 int class_uuid2dev(struct obd_uuid *uuid)
439 read_lock(&obd_dev_lock);
440 for (i = 0; i < class_devno_max(); i++) {
441 struct obd_device *obd = class_num2obd(i);
443 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
444 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
445 read_unlock(&obd_dev_lock);
449 read_unlock(&obd_dev_lock);
454 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
456 int dev = class_uuid2dev(uuid);
459 return class_num2obd(dev);
461 EXPORT_SYMBOL(class_uuid2obd);
464 * Get obd device from ::obd_devs[]
466 * \param num [in] array index
468 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
469 * otherwise return the obd device there.
471 struct obd_device *class_num2obd(int num)
473 struct obd_device *obd = NULL;
475 if (num < class_devno_max()) {
480 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
481 "%p obd_magic %08x != %08x\n",
482 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
483 LASSERTF(obd->obd_minor == num,
484 "%p obd_minor %0d != %0d\n",
485 obd, obd->obd_minor, num);
492 * Get obd devices count. Device in any
494 * \retval obd device count
496 int get_devices_count(void)
498 int index, max_index = class_devno_max(), dev_count = 0;
500 read_lock(&obd_dev_lock);
501 for (index = 0; index <= max_index; index++) {
502 struct obd_device *obd = class_num2obd(index);
506 read_unlock(&obd_dev_lock);
510 EXPORT_SYMBOL(get_devices_count);
512 void class_obd_list(void)
517 read_lock(&obd_dev_lock);
518 for (i = 0; i < class_devno_max(); i++) {
519 struct obd_device *obd = class_num2obd(i);
523 if (obd->obd_stopping)
525 else if (obd->obd_set_up)
527 else if (obd->obd_attached)
531 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
532 i, status, obd->obd_type->typ_name,
533 obd->obd_name, obd->obd_uuid.uuid,
534 atomic_read(&obd->obd_refcount));
536 read_unlock(&obd_dev_lock);
540 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
541 specified, then only the client with that uuid is returned,
542 otherwise any client connected to the tgt is returned. */
543 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
544 const char * typ_name,
545 struct obd_uuid *grp_uuid)
549 read_lock(&obd_dev_lock);
550 for (i = 0; i < class_devno_max(); i++) {
551 struct obd_device *obd = class_num2obd(i);
555 if ((strncmp(obd->obd_type->typ_name, typ_name,
556 strlen(typ_name)) == 0)) {
557 if (obd_uuid_equals(tgt_uuid,
558 &obd->u.cli.cl_target_uuid) &&
559 ((grp_uuid)? obd_uuid_equals(grp_uuid,
560 &obd->obd_uuid) : 1)) {
561 read_unlock(&obd_dev_lock);
566 read_unlock(&obd_dev_lock);
570 EXPORT_SYMBOL(class_find_client_obd);
572 /* Iterate the obd_device list looking devices have grp_uuid. Start
573 searching at *next, and if a device is found, the next index to look
574 at is saved in *next. If next is NULL, then the first matching device
575 will always be returned. */
576 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
582 else if (*next >= 0 && *next < class_devno_max())
587 read_lock(&obd_dev_lock);
588 for (; i < class_devno_max(); i++) {
589 struct obd_device *obd = class_num2obd(i);
593 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
596 read_unlock(&obd_dev_lock);
600 read_unlock(&obd_dev_lock);
604 EXPORT_SYMBOL(class_devices_in_group);
607 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
608 * adjust sptlrpc settings accordingly.
610 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
612 struct obd_device *obd;
616 LASSERT(namelen > 0);
618 read_lock(&obd_dev_lock);
619 for (i = 0; i < class_devno_max(); i++) {
620 obd = class_num2obd(i);
622 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
625 /* only notify mdc, osc, osp, lwp, mdt, ost
626 * because only these have a -sptlrpc llog */
627 type = obd->obd_type->typ_name;
628 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
629 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
630 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
631 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
632 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
633 strcmp(type, LUSTRE_OST_NAME) != 0)
636 if (strncmp(obd->obd_name, fsname, namelen))
639 class_incref(obd, __FUNCTION__, obd);
640 read_unlock(&obd_dev_lock);
641 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
642 sizeof(KEY_SPTLRPC_CONF),
643 KEY_SPTLRPC_CONF, 0, NULL, NULL);
645 class_decref(obd, __FUNCTION__, obd);
646 read_lock(&obd_dev_lock);
648 read_unlock(&obd_dev_lock);
651 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
653 void obd_cleanup_caches(void)
656 if (obd_device_cachep) {
657 kmem_cache_destroy(obd_device_cachep);
658 obd_device_cachep = NULL;
661 kmem_cache_destroy(obdo_cachep);
665 kmem_cache_destroy(import_cachep);
666 import_cachep = NULL;
672 int obd_init_caches(void)
677 LASSERT(obd_device_cachep == NULL);
678 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
679 sizeof(struct obd_device),
681 if (!obd_device_cachep)
682 GOTO(out, rc = -ENOMEM);
684 LASSERT(obdo_cachep == NULL);
685 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
688 GOTO(out, rc = -ENOMEM);
690 LASSERT(import_cachep == NULL);
691 import_cachep = kmem_cache_create("ll_import_cache",
692 sizeof(struct obd_import),
695 GOTO(out, rc = -ENOMEM);
699 obd_cleanup_caches();
703 /* map connection to client */
704 struct obd_export *class_conn2export(struct lustre_handle *conn)
706 struct obd_export *export;
710 CDEBUG(D_CACHE, "looking for null handle\n");
714 if (conn->cookie == -1) { /* this means assign a new connection */
715 CDEBUG(D_CACHE, "want a new connection\n");
719 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
720 export = class_handle2object(conn->cookie, NULL);
723 EXPORT_SYMBOL(class_conn2export);
725 struct obd_device *class_exp2obd(struct obd_export *exp)
731 EXPORT_SYMBOL(class_exp2obd);
733 struct obd_device *class_conn2obd(struct lustre_handle *conn)
735 struct obd_export *export;
736 export = class_conn2export(conn);
738 struct obd_device *obd = export->exp_obd;
739 class_export_put(export);
745 struct obd_import *class_exp2cliimp(struct obd_export *exp)
747 struct obd_device *obd = exp->exp_obd;
750 return obd->u.cli.cl_import;
752 EXPORT_SYMBOL(class_exp2cliimp);
754 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
756 struct obd_device *obd = class_conn2obd(conn);
759 return obd->u.cli.cl_import;
762 /* Export management functions */
763 static void class_export_destroy(struct obd_export *exp)
765 struct obd_device *obd = exp->exp_obd;
768 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
769 LASSERT(obd != NULL);
771 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
772 exp->exp_client_uuid.uuid, obd->obd_name);
774 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
775 if (exp->exp_connection)
776 ptlrpc_put_connection_superhack(exp->exp_connection);
778 LASSERT(list_empty(&exp->exp_outstanding_replies));
779 LASSERT(list_empty(&exp->exp_uncommitted_replies));
780 LASSERT(list_empty(&exp->exp_req_replay_queue));
781 LASSERT(list_empty(&exp->exp_hp_rpcs));
782 obd_destroy_export(exp);
783 class_decref(obd, "export", exp);
785 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
789 static void export_handle_addref(void *export)
791 class_export_get(export);
794 static struct portals_handle_ops export_handle_ops = {
795 .hop_addref = export_handle_addref,
799 struct obd_export *class_export_get(struct obd_export *exp)
801 atomic_inc(&exp->exp_refcount);
802 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
803 atomic_read(&exp->exp_refcount));
806 EXPORT_SYMBOL(class_export_get);
808 void class_export_put(struct obd_export *exp)
810 LASSERT(exp != NULL);
811 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
812 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
813 atomic_read(&exp->exp_refcount) - 1);
815 if (atomic_dec_and_test(&exp->exp_refcount)) {
816 LASSERT(!list_empty(&exp->exp_obd_chain));
817 LASSERT(list_empty(&exp->exp_stale_list));
818 CDEBUG(D_IOCTL, "final put %p/%s\n",
819 exp, exp->exp_client_uuid.uuid);
821 /* release nid stat refererence */
822 lprocfs_exp_cleanup(exp);
824 obd_zombie_export_add(exp);
827 EXPORT_SYMBOL(class_export_put);
829 /* Creates a new export, adds it to the hash table, and returns a
830 * pointer to it. The refcount is 2: one for the hash reference, and
831 * one for the pointer returned by this function. */
832 struct obd_export *class_new_export(struct obd_device *obd,
833 struct obd_uuid *cluuid)
835 struct obd_export *export;
836 struct cfs_hash *hash = NULL;
840 OBD_ALLOC_PTR(export);
842 return ERR_PTR(-ENOMEM);
844 export->exp_conn_cnt = 0;
845 export->exp_lock_hash = NULL;
846 export->exp_flock_hash = NULL;
847 atomic_set(&export->exp_refcount, 2);
848 atomic_set(&export->exp_rpc_count, 0);
849 atomic_set(&export->exp_cb_count, 0);
850 atomic_set(&export->exp_locks_count, 0);
851 #if LUSTRE_TRACKS_LOCK_EXP_REFS
852 INIT_LIST_HEAD(&export->exp_locks_list);
853 spin_lock_init(&export->exp_locks_list_guard);
855 atomic_set(&export->exp_replay_count, 0);
856 export->exp_obd = obd;
857 INIT_LIST_HEAD(&export->exp_outstanding_replies);
858 spin_lock_init(&export->exp_uncommitted_replies_lock);
859 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
860 INIT_LIST_HEAD(&export->exp_req_replay_queue);
861 INIT_LIST_HEAD(&export->exp_handle.h_link);
862 INIT_LIST_HEAD(&export->exp_hp_rpcs);
863 INIT_LIST_HEAD(&export->exp_reg_rpcs);
864 class_handle_hash(&export->exp_handle, &export_handle_ops);
865 export->exp_last_request_time = cfs_time_current_sec();
866 spin_lock_init(&export->exp_lock);
867 spin_lock_init(&export->exp_rpc_lock);
868 INIT_HLIST_NODE(&export->exp_uuid_hash);
869 INIT_HLIST_NODE(&export->exp_nid_hash);
870 INIT_HLIST_NODE(&export->exp_gen_hash);
871 spin_lock_init(&export->exp_bl_list_lock);
872 INIT_LIST_HEAD(&export->exp_bl_list);
873 INIT_LIST_HEAD(&export->exp_stale_list);
875 export->exp_sp_peer = LUSTRE_SP_ANY;
876 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
877 export->exp_client_uuid = *cluuid;
878 obd_init_export(export);
880 spin_lock(&obd->obd_dev_lock);
881 /* shouldn't happen, but might race */
882 if (obd->obd_stopping)
883 GOTO(exit_unlock, rc = -ENODEV);
885 hash = cfs_hash_getref(obd->obd_uuid_hash);
887 GOTO(exit_unlock, rc = -ENODEV);
888 spin_unlock(&obd->obd_dev_lock);
890 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
891 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
893 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
894 obd->obd_name, cluuid->uuid, rc);
895 GOTO(exit_err, rc = -EALREADY);
899 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
900 spin_lock(&obd->obd_dev_lock);
901 if (obd->obd_stopping) {
902 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
903 GOTO(exit_unlock, rc = -ENODEV);
906 class_incref(obd, "export", export);
907 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
908 list_add_tail(&export->exp_obd_chain_timed,
909 &export->exp_obd->obd_exports_timed);
910 export->exp_obd->obd_num_exports++;
911 spin_unlock(&obd->obd_dev_lock);
912 cfs_hash_putref(hash);
916 spin_unlock(&obd->obd_dev_lock);
919 cfs_hash_putref(hash);
920 class_handle_unhash(&export->exp_handle);
921 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
922 obd_destroy_export(export);
923 OBD_FREE_PTR(export);
926 EXPORT_SYMBOL(class_new_export);
928 void class_unlink_export(struct obd_export *exp)
930 class_handle_unhash(&exp->exp_handle);
932 spin_lock(&exp->exp_obd->obd_dev_lock);
933 /* delete an uuid-export hashitem from hashtables */
934 if (!hlist_unhashed(&exp->exp_uuid_hash))
935 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
936 &exp->exp_client_uuid,
937 &exp->exp_uuid_hash);
939 if (!hlist_unhashed(&exp->exp_gen_hash)) {
940 struct tg_export_data *ted = &exp->exp_target_data;
941 struct cfs_hash *hash;
943 /* Because obd_gen_hash will not be released until
944 * class_cleanup(), so hash should never be NULL here */
945 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
946 LASSERT(hash != NULL);
947 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
949 cfs_hash_putref(hash);
952 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
953 list_del_init(&exp->exp_obd_chain_timed);
954 exp->exp_obd->obd_num_exports--;
955 spin_unlock(&exp->exp_obd->obd_dev_lock);
956 atomic_inc(&obd_stale_export_num);
958 /* A reference is kept by obd_stale_exports list */
959 obd_stale_export_put(exp);
961 EXPORT_SYMBOL(class_unlink_export);
963 /* Import management functions */
964 static void class_import_destroy(struct obd_import *imp)
968 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
969 imp->imp_obd->obd_name);
971 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
973 ptlrpc_put_connection_superhack(imp->imp_connection);
975 while (!list_empty(&imp->imp_conn_list)) {
976 struct obd_import_conn *imp_conn;
978 imp_conn = list_entry(imp->imp_conn_list.next,
979 struct obd_import_conn, oic_item);
980 list_del_init(&imp_conn->oic_item);
981 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
982 OBD_FREE(imp_conn, sizeof(*imp_conn));
985 LASSERT(imp->imp_sec == NULL);
986 class_decref(imp->imp_obd, "import", imp);
987 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
991 static void import_handle_addref(void *import)
993 class_import_get(import);
996 static struct portals_handle_ops import_handle_ops = {
997 .hop_addref = import_handle_addref,
1001 struct obd_import *class_import_get(struct obd_import *import)
1003 atomic_inc(&import->imp_refcount);
1004 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1005 atomic_read(&import->imp_refcount),
1006 import->imp_obd->obd_name);
1009 EXPORT_SYMBOL(class_import_get);
1011 void class_import_put(struct obd_import *imp)
1015 LASSERT(list_empty(&imp->imp_zombie_chain));
1016 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1018 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1019 atomic_read(&imp->imp_refcount) - 1,
1020 imp->imp_obd->obd_name);
1022 if (atomic_dec_and_test(&imp->imp_refcount)) {
1023 CDEBUG(D_INFO, "final put import %p\n", imp);
1024 obd_zombie_import_add(imp);
1027 /* catch possible import put race */
1028 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1031 EXPORT_SYMBOL(class_import_put);
1033 static void init_imp_at(struct imp_at *at) {
1035 at_init(&at->iat_net_latency, 0, 0);
1036 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1037 /* max service estimates are tracked on the server side, so
1038 don't use the AT history here, just use the last reported
1039 val. (But keep hist for proc histogram, worst_ever) */
1040 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1045 struct obd_import *class_new_import(struct obd_device *obd)
1047 struct obd_import *imp;
1049 OBD_ALLOC(imp, sizeof(*imp));
1053 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1054 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1055 INIT_LIST_HEAD(&imp->imp_replay_list);
1056 INIT_LIST_HEAD(&imp->imp_sending_list);
1057 INIT_LIST_HEAD(&imp->imp_delayed_list);
1058 INIT_LIST_HEAD(&imp->imp_committed_list);
1059 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1060 imp->imp_known_replied_xid = 0;
1061 imp->imp_replay_cursor = &imp->imp_committed_list;
1062 spin_lock_init(&imp->imp_lock);
1063 imp->imp_last_success_conn = 0;
1064 imp->imp_state = LUSTRE_IMP_NEW;
1065 imp->imp_obd = class_incref(obd, "import", imp);
1066 mutex_init(&imp->imp_sec_mutex);
1067 init_waitqueue_head(&imp->imp_recovery_waitq);
1069 atomic_set(&imp->imp_refcount, 2);
1070 atomic_set(&imp->imp_unregistering, 0);
1071 atomic_set(&imp->imp_inflight, 0);
1072 atomic_set(&imp->imp_replay_inflight, 0);
1073 atomic_set(&imp->imp_inval_count, 0);
1074 INIT_LIST_HEAD(&imp->imp_conn_list);
1075 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1076 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1077 init_imp_at(&imp->imp_at);
1079 /* the default magic is V2, will be used in connect RPC, and
1080 * then adjusted according to the flags in request/reply. */
1081 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1085 EXPORT_SYMBOL(class_new_import);
1087 void class_destroy_import(struct obd_import *import)
1089 LASSERT(import != NULL);
1090 LASSERT(import != LP_POISON);
1092 class_handle_unhash(&import->imp_handle);
1094 spin_lock(&import->imp_lock);
1095 import->imp_generation++;
1096 spin_unlock(&import->imp_lock);
1097 class_import_put(import);
1099 EXPORT_SYMBOL(class_destroy_import);
1101 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1103 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1105 spin_lock(&exp->exp_locks_list_guard);
1107 LASSERT(lock->l_exp_refs_nr >= 0);
1109 if (lock->l_exp_refs_target != NULL &&
1110 lock->l_exp_refs_target != exp) {
1111 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1112 exp, lock, lock->l_exp_refs_target);
1114 if ((lock->l_exp_refs_nr ++) == 0) {
1115 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1116 lock->l_exp_refs_target = exp;
1118 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1119 lock, exp, lock->l_exp_refs_nr);
1120 spin_unlock(&exp->exp_locks_list_guard);
1122 EXPORT_SYMBOL(__class_export_add_lock_ref);
1124 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1126 spin_lock(&exp->exp_locks_list_guard);
1127 LASSERT(lock->l_exp_refs_nr > 0);
1128 if (lock->l_exp_refs_target != exp) {
1129 LCONSOLE_WARN("lock %p, "
1130 "mismatching export pointers: %p, %p\n",
1131 lock, lock->l_exp_refs_target, exp);
1133 if (-- lock->l_exp_refs_nr == 0) {
1134 list_del_init(&lock->l_exp_refs_link);
1135 lock->l_exp_refs_target = NULL;
1137 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1138 lock, exp, lock->l_exp_refs_nr);
1139 spin_unlock(&exp->exp_locks_list_guard);
1141 EXPORT_SYMBOL(__class_export_del_lock_ref);
1144 /* A connection defines an export context in which preallocation can
1145 be managed. This releases the export pointer reference, and returns
1146 the export handle, so the export refcount is 1 when this function
1148 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1149 struct obd_uuid *cluuid)
1151 struct obd_export *export;
1152 LASSERT(conn != NULL);
1153 LASSERT(obd != NULL);
1154 LASSERT(cluuid != NULL);
1157 export = class_new_export(obd, cluuid);
1159 RETURN(PTR_ERR(export));
1161 conn->cookie = export->exp_handle.h_cookie;
1162 class_export_put(export);
1164 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1165 cluuid->uuid, conn->cookie);
1168 EXPORT_SYMBOL(class_connect);
1170 /* if export is involved in recovery then clean up related things */
1171 static void class_export_recovery_cleanup(struct obd_export *exp)
1173 struct obd_device *obd = exp->exp_obd;
1175 spin_lock(&obd->obd_recovery_task_lock);
1176 if (obd->obd_recovering) {
1177 if (exp->exp_in_recovery) {
1178 spin_lock(&exp->exp_lock);
1179 exp->exp_in_recovery = 0;
1180 spin_unlock(&exp->exp_lock);
1181 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1182 atomic_dec(&obd->obd_connected_clients);
1185 /* if called during recovery then should update
1186 * obd_stale_clients counter,
1187 * lightweight exports are not counted */
1188 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1189 exp->exp_obd->obd_stale_clients++;
1191 spin_unlock(&obd->obd_recovery_task_lock);
1193 spin_lock(&exp->exp_lock);
1194 /** Cleanup req replay fields */
1195 if (exp->exp_req_replay_needed) {
1196 exp->exp_req_replay_needed = 0;
1198 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1199 atomic_dec(&obd->obd_req_replay_clients);
1202 /** Cleanup lock replay data */
1203 if (exp->exp_lock_replay_needed) {
1204 exp->exp_lock_replay_needed = 0;
1206 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1207 atomic_dec(&obd->obd_lock_replay_clients);
1209 spin_unlock(&exp->exp_lock);
1212 /* This function removes 1-3 references from the export:
1213 * 1 - for export pointer passed
1214 * and if disconnect really need
1215 * 2 - removing from hash
1216 * 3 - in client_unlink_export
1217 * The export pointer passed to this function can destroyed */
1218 int class_disconnect(struct obd_export *export)
1220 int already_disconnected;
1223 if (export == NULL) {
1224 CWARN("attempting to free NULL export %p\n", export);
1228 spin_lock(&export->exp_lock);
1229 already_disconnected = export->exp_disconnected;
1230 export->exp_disconnected = 1;
1231 spin_unlock(&export->exp_lock);
1233 /* class_cleanup(), abort_recovery(), and class_fail_export()
1234 * all end up in here, and if any of them race we shouldn't
1235 * call extra class_export_puts(). */
1236 if (already_disconnected) {
1237 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1238 GOTO(no_disconn, already_disconnected);
1241 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1242 export->exp_handle.h_cookie);
1244 if (!hlist_unhashed(&export->exp_nid_hash))
1245 cfs_hash_del(export->exp_obd->obd_nid_hash,
1246 &export->exp_connection->c_peer.nid,
1247 &export->exp_nid_hash);
1249 class_export_recovery_cleanup(export);
1250 class_unlink_export(export);
1252 class_export_put(export);
1255 EXPORT_SYMBOL(class_disconnect);
1257 /* Return non-zero for a fully connected export */
1258 int class_connected_export(struct obd_export *exp)
1263 spin_lock(&exp->exp_lock);
1264 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1265 spin_unlock(&exp->exp_lock);
1269 EXPORT_SYMBOL(class_connected_export);
1271 static void class_disconnect_export_list(struct list_head *list,
1272 enum obd_option flags)
1275 struct obd_export *exp;
1278 /* It's possible that an export may disconnect itself, but
1279 * nothing else will be added to this list. */
1280 while (!list_empty(list)) {
1281 exp = list_entry(list->next, struct obd_export,
1283 /* need for safe call CDEBUG after obd_disconnect */
1284 class_export_get(exp);
1286 spin_lock(&exp->exp_lock);
1287 exp->exp_flags = flags;
1288 spin_unlock(&exp->exp_lock);
1290 if (obd_uuid_equals(&exp->exp_client_uuid,
1291 &exp->exp_obd->obd_uuid)) {
1293 "exp %p export uuid == obd uuid, don't discon\n",
1295 /* Need to delete this now so we don't end up pointing
1296 * to work_list later when this export is cleaned up. */
1297 list_del_init(&exp->exp_obd_chain);
1298 class_export_put(exp);
1302 class_export_get(exp);
1303 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1304 "last request at "CFS_TIME_T"\n",
1305 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1306 exp, exp->exp_last_request_time);
1307 /* release one export reference anyway */
1308 rc = obd_disconnect(exp);
1310 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1311 obd_export_nid2str(exp), exp, rc);
1312 class_export_put(exp);
1317 void class_disconnect_exports(struct obd_device *obd)
1319 struct list_head work_list;
1322 /* Move all of the exports from obd_exports to a work list, en masse. */
1323 INIT_LIST_HEAD(&work_list);
1324 spin_lock(&obd->obd_dev_lock);
1325 list_splice_init(&obd->obd_exports, &work_list);
1326 list_splice_init(&obd->obd_delayed_exports, &work_list);
1327 spin_unlock(&obd->obd_dev_lock);
1329 if (!list_empty(&work_list)) {
1330 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1331 "disconnecting them\n", obd->obd_minor, obd);
1332 class_disconnect_export_list(&work_list,
1333 exp_flags_from_obd(obd));
1335 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1336 obd->obd_minor, obd);
1339 EXPORT_SYMBOL(class_disconnect_exports);
1341 /* Remove exports that have not completed recovery.
1343 void class_disconnect_stale_exports(struct obd_device *obd,
1344 int (*test_export)(struct obd_export *))
1346 struct list_head work_list;
1347 struct obd_export *exp, *n;
1351 INIT_LIST_HEAD(&work_list);
1352 spin_lock(&obd->obd_dev_lock);
1353 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1355 /* don't count self-export as client */
1356 if (obd_uuid_equals(&exp->exp_client_uuid,
1357 &exp->exp_obd->obd_uuid))
1360 /* don't evict clients which have no slot in last_rcvd
1361 * (e.g. lightweight connection) */
1362 if (exp->exp_target_data.ted_lr_idx == -1)
1365 spin_lock(&exp->exp_lock);
1366 if (exp->exp_failed || test_export(exp)) {
1367 spin_unlock(&exp->exp_lock);
1370 exp->exp_failed = 1;
1371 spin_unlock(&exp->exp_lock);
1373 list_move(&exp->exp_obd_chain, &work_list);
1375 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1376 obd->obd_name, exp->exp_client_uuid.uuid,
1377 exp->exp_connection == NULL ? "<unknown>" :
1378 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1379 print_export_data(exp, "EVICTING", 0, D_HA);
1381 spin_unlock(&obd->obd_dev_lock);
1384 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1385 obd->obd_name, evicted);
1387 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1388 OBD_OPT_ABORT_RECOV);
1391 EXPORT_SYMBOL(class_disconnect_stale_exports);
1393 void class_fail_export(struct obd_export *exp)
1395 int rc, already_failed;
1397 spin_lock(&exp->exp_lock);
1398 already_failed = exp->exp_failed;
1399 exp->exp_failed = 1;
1400 spin_unlock(&exp->exp_lock);
1402 if (already_failed) {
1403 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1404 exp, exp->exp_client_uuid.uuid);
1408 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1409 exp, exp->exp_client_uuid.uuid);
1411 if (obd_dump_on_timeout)
1412 libcfs_debug_dumplog();
1414 /* need for safe call CDEBUG after obd_disconnect */
1415 class_export_get(exp);
1417 /* Most callers into obd_disconnect are removing their own reference
1418 * (request, for example) in addition to the one from the hash table.
1419 * We don't have such a reference here, so make one. */
1420 class_export_get(exp);
1421 rc = obd_disconnect(exp);
1423 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1425 CDEBUG(D_HA, "disconnected export %p/%s\n",
1426 exp, exp->exp_client_uuid.uuid);
1427 class_export_put(exp);
1429 EXPORT_SYMBOL(class_fail_export);
1431 char *obd_export_nid2str(struct obd_export *exp)
1433 if (exp->exp_connection != NULL)
1434 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1438 EXPORT_SYMBOL(obd_export_nid2str);
1440 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1442 struct cfs_hash *nid_hash;
1443 struct obd_export *doomed_exp = NULL;
1444 int exports_evicted = 0;
1446 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1448 spin_lock(&obd->obd_dev_lock);
1449 /* umount has run already, so evict thread should leave
1450 * its task to umount thread now */
1451 if (obd->obd_stopping) {
1452 spin_unlock(&obd->obd_dev_lock);
1453 return exports_evicted;
1455 nid_hash = obd->obd_nid_hash;
1456 cfs_hash_getref(nid_hash);
1457 spin_unlock(&obd->obd_dev_lock);
1460 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1461 if (doomed_exp == NULL)
1464 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1465 "nid %s found, wanted nid %s, requested nid %s\n",
1466 obd_export_nid2str(doomed_exp),
1467 libcfs_nid2str(nid_key), nid);
1468 LASSERTF(doomed_exp != obd->obd_self_export,
1469 "self-export is hashed by NID?\n");
1471 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1472 "request\n", obd->obd_name,
1473 obd_uuid2str(&doomed_exp->exp_client_uuid),
1474 obd_export_nid2str(doomed_exp));
1475 class_fail_export(doomed_exp);
1476 class_export_put(doomed_exp);
1479 cfs_hash_putref(nid_hash);
1481 if (!exports_evicted)
1482 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1483 obd->obd_name, nid);
1484 return exports_evicted;
1486 EXPORT_SYMBOL(obd_export_evict_by_nid);
1488 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1490 struct cfs_hash *uuid_hash;
1491 struct obd_export *doomed_exp = NULL;
1492 struct obd_uuid doomed_uuid;
1493 int exports_evicted = 0;
1495 spin_lock(&obd->obd_dev_lock);
1496 if (obd->obd_stopping) {
1497 spin_unlock(&obd->obd_dev_lock);
1498 return exports_evicted;
1500 uuid_hash = obd->obd_uuid_hash;
1501 cfs_hash_getref(uuid_hash);
1502 spin_unlock(&obd->obd_dev_lock);
1504 obd_str2uuid(&doomed_uuid, uuid);
1505 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1506 CERROR("%s: can't evict myself\n", obd->obd_name);
1507 cfs_hash_putref(uuid_hash);
1508 return exports_evicted;
1511 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1513 if (doomed_exp == NULL) {
1514 CERROR("%s: can't disconnect %s: no exports found\n",
1515 obd->obd_name, uuid);
1517 CWARN("%s: evicting %s at adminstrative request\n",
1518 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1519 class_fail_export(doomed_exp);
1520 class_export_put(doomed_exp);
1523 cfs_hash_putref(uuid_hash);
1525 return exports_evicted;
1528 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1529 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1530 EXPORT_SYMBOL(class_export_dump_hook);
1533 static void print_export_data(struct obd_export *exp, const char *status,
1534 int locks, int debug_level)
1536 struct ptlrpc_reply_state *rs;
1537 struct ptlrpc_reply_state *first_reply = NULL;
1540 spin_lock(&exp->exp_lock);
1541 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1547 spin_unlock(&exp->exp_lock);
1549 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1550 "%p %s %llu stale:%d\n",
1551 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1552 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1553 atomic_read(&exp->exp_rpc_count),
1554 atomic_read(&exp->exp_cb_count),
1555 atomic_read(&exp->exp_locks_count),
1556 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1557 nreplies, first_reply, nreplies > 3 ? "..." : "",
1558 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1559 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1560 if (locks && class_export_dump_hook != NULL)
1561 class_export_dump_hook(exp);
1565 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1567 struct obd_export *exp;
1569 spin_lock(&obd->obd_dev_lock);
1570 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1571 print_export_data(exp, "ACTIVE", locks, debug_level);
1572 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1573 print_export_data(exp, "UNLINKED", locks, debug_level);
1574 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1575 print_export_data(exp, "DELAYED", locks, debug_level);
1576 spin_unlock(&obd->obd_dev_lock);
1577 spin_lock(&obd_zombie_impexp_lock);
1578 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1579 print_export_data(exp, "ZOMBIE", locks, debug_level);
1580 spin_unlock(&obd_zombie_impexp_lock);
1583 void obd_exports_barrier(struct obd_device *obd)
1586 LASSERT(list_empty(&obd->obd_exports));
1587 spin_lock(&obd->obd_dev_lock);
1588 while (!list_empty(&obd->obd_unlinked_exports)) {
1589 spin_unlock(&obd->obd_dev_lock);
1590 set_current_state(TASK_UNINTERRUPTIBLE);
1591 schedule_timeout(cfs_time_seconds(waited));
1592 if (waited > 5 && IS_PO2(waited)) {
1593 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1594 "more than %d seconds. "
1595 "The obd refcount = %d. Is it stuck?\n",
1596 obd->obd_name, waited,
1597 atomic_read(&obd->obd_refcount));
1598 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1601 spin_lock(&obd->obd_dev_lock);
1603 spin_unlock(&obd->obd_dev_lock);
1605 EXPORT_SYMBOL(obd_exports_barrier);
1607 /* Total amount of zombies to be destroyed */
1608 static int zombies_count = 0;
1611 * kill zombie imports and exports
1613 void obd_zombie_impexp_cull(void)
1615 struct obd_import *import;
1616 struct obd_export *export;
1620 spin_lock(&obd_zombie_impexp_lock);
1623 if (!list_empty(&obd_zombie_imports)) {
1624 import = list_entry(obd_zombie_imports.next,
1627 list_del_init(&import->imp_zombie_chain);
1631 if (!list_empty(&obd_zombie_exports)) {
1632 export = list_entry(obd_zombie_exports.next,
1635 list_del_init(&export->exp_obd_chain);
1638 spin_unlock(&obd_zombie_impexp_lock);
1640 if (import != NULL) {
1641 class_import_destroy(import);
1642 spin_lock(&obd_zombie_impexp_lock);
1644 spin_unlock(&obd_zombie_impexp_lock);
1647 if (export != NULL) {
1648 class_export_destroy(export);
1649 spin_lock(&obd_zombie_impexp_lock);
1651 spin_unlock(&obd_zombie_impexp_lock);
1655 } while (import != NULL || export != NULL);
1659 static struct completion obd_zombie_start;
1660 static struct completion obd_zombie_stop;
1661 static unsigned long obd_zombie_flags;
1662 static wait_queue_head_t obd_zombie_waitq;
1663 static pid_t obd_zombie_pid;
1666 OBD_ZOMBIE_STOP = 0x0001,
1670 * check for work for kill zombie import/export thread.
1672 static int obd_zombie_impexp_check(void *arg)
1676 spin_lock(&obd_zombie_impexp_lock);
1677 rc = (zombies_count == 0) &&
1678 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1679 spin_unlock(&obd_zombie_impexp_lock);
1685 * Add export to the obd_zombe thread and notify it.
1687 static void obd_zombie_export_add(struct obd_export *exp) {
1688 atomic_dec(&obd_stale_export_num);
1689 spin_lock(&exp->exp_obd->obd_dev_lock);
1690 LASSERT(!list_empty(&exp->exp_obd_chain));
1691 list_del_init(&exp->exp_obd_chain);
1692 spin_unlock(&exp->exp_obd->obd_dev_lock);
1693 spin_lock(&obd_zombie_impexp_lock);
1695 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1696 spin_unlock(&obd_zombie_impexp_lock);
1698 obd_zombie_impexp_notify();
1702 * Add import to the obd_zombe thread and notify it.
1704 static void obd_zombie_import_add(struct obd_import *imp) {
1705 LASSERT(imp->imp_sec == NULL);
1706 spin_lock(&obd_zombie_impexp_lock);
1707 LASSERT(list_empty(&imp->imp_zombie_chain));
1709 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1710 spin_unlock(&obd_zombie_impexp_lock);
1712 obd_zombie_impexp_notify();
1716 * notify import/export destroy thread about new zombie.
1718 static void obd_zombie_impexp_notify(void)
1721 * Make sure obd_zomebie_impexp_thread get this notification.
1722 * It is possible this signal only get by obd_zombie_barrier, and
1723 * barrier gulps this notification and sleeps away and hangs ensues
1725 wake_up_all(&obd_zombie_waitq);
1729 * check whether obd_zombie is idle
1731 static int obd_zombie_is_idle(void)
1735 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1736 spin_lock(&obd_zombie_impexp_lock);
1737 rc = (zombies_count == 0);
1738 spin_unlock(&obd_zombie_impexp_lock);
1743 * wait when obd_zombie import/export queues become empty
1745 void obd_zombie_barrier(void)
1747 struct l_wait_info lwi = { 0 };
1749 if (obd_zombie_pid == current_pid())
1750 /* don't wait for myself */
1752 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1754 EXPORT_SYMBOL(obd_zombie_barrier);
1757 struct obd_export *obd_stale_export_get(void)
1759 struct obd_export *exp = NULL;
1762 spin_lock(&obd_stale_export_lock);
1763 if (!list_empty(&obd_stale_exports)) {
1764 exp = list_entry(obd_stale_exports.next,
1765 struct obd_export, exp_stale_list);
1766 list_del_init(&exp->exp_stale_list);
1768 spin_unlock(&obd_stale_export_lock);
1771 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1772 atomic_read(&obd_stale_export_num));
1776 EXPORT_SYMBOL(obd_stale_export_get);
1778 void obd_stale_export_put(struct obd_export *exp)
1782 LASSERT(list_empty(&exp->exp_stale_list));
1783 if (exp->exp_lock_hash &&
1784 atomic_read(&exp->exp_lock_hash->hs_count)) {
1785 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1786 atomic_read(&obd_stale_export_num));
1788 spin_lock_bh(&exp->exp_bl_list_lock);
1789 spin_lock(&obd_stale_export_lock);
1790 /* Add to the tail if there is no blocked locks,
1791 * to the head otherwise. */
1792 if (list_empty(&exp->exp_bl_list))
1793 list_add_tail(&exp->exp_stale_list,
1794 &obd_stale_exports);
1796 list_add(&exp->exp_stale_list,
1797 &obd_stale_exports);
1799 spin_unlock(&obd_stale_export_lock);
1800 spin_unlock_bh(&exp->exp_bl_list_lock);
1802 class_export_put(exp);
1806 EXPORT_SYMBOL(obd_stale_export_put);
1809 * Adjust the position of the export in the stale list,
1810 * i.e. move to the head of the list if is needed.
1812 void obd_stale_export_adjust(struct obd_export *exp)
1814 LASSERT(exp != NULL);
1815 spin_lock_bh(&exp->exp_bl_list_lock);
1816 spin_lock(&obd_stale_export_lock);
1818 if (!list_empty(&exp->exp_stale_list) &&
1819 !list_empty(&exp->exp_bl_list))
1820 list_move(&exp->exp_stale_list, &obd_stale_exports);
1822 spin_unlock(&obd_stale_export_lock);
1823 spin_unlock_bh(&exp->exp_bl_list_lock);
1825 EXPORT_SYMBOL(obd_stale_export_adjust);
1828 * destroy zombie export/import thread.
1830 static int obd_zombie_impexp_thread(void *unused)
1832 unshare_fs_struct();
1833 complete(&obd_zombie_start);
1835 obd_zombie_pid = current_pid();
1837 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1838 struct l_wait_info lwi = { 0 };
1840 l_wait_event(obd_zombie_waitq,
1841 !obd_zombie_impexp_check(NULL), &lwi);
1842 obd_zombie_impexp_cull();
1845 * Notify obd_zombie_barrier callers that queues
1848 wake_up(&obd_zombie_waitq);
1851 complete(&obd_zombie_stop);
1858 * start destroy zombie import/export thread
1860 int obd_zombie_impexp_init(void)
1862 struct task_struct *task;
1864 INIT_LIST_HEAD(&obd_zombie_imports);
1866 INIT_LIST_HEAD(&obd_zombie_exports);
1867 spin_lock_init(&obd_zombie_impexp_lock);
1868 init_completion(&obd_zombie_start);
1869 init_completion(&obd_zombie_stop);
1870 init_waitqueue_head(&obd_zombie_waitq);
1873 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1875 RETURN(PTR_ERR(task));
1877 wait_for_completion(&obd_zombie_start);
1881 * stop destroy zombie import/export thread
1883 void obd_zombie_impexp_stop(void)
1885 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1886 obd_zombie_impexp_notify();
1887 wait_for_completion(&obd_zombie_stop);
1890 /***** Kernel-userspace comm helpers *******/
1892 /* Get length of entire message, including header */
1893 int kuc_len(int payload_len)
1895 return sizeof(struct kuc_hdr) + payload_len;
1897 EXPORT_SYMBOL(kuc_len);
1899 /* Get a pointer to kuc header, given a ptr to the payload
1900 * @param p Pointer to payload area
1901 * @returns Pointer to kuc header
1903 struct kuc_hdr * kuc_ptr(void *p)
1905 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1906 LASSERT(lh->kuc_magic == KUC_MAGIC);
1909 EXPORT_SYMBOL(kuc_ptr);
1911 /* Alloc space for a message, and fill in header
1912 * @return Pointer to payload area
1914 void *kuc_alloc(int payload_len, int transport, int type)
1917 int len = kuc_len(payload_len);
1921 return ERR_PTR(-ENOMEM);
1923 lh->kuc_magic = KUC_MAGIC;
1924 lh->kuc_transport = transport;
1925 lh->kuc_msgtype = type;
1926 lh->kuc_msglen = len;
1928 return (void *)(lh + 1);
1930 EXPORT_SYMBOL(kuc_alloc);
1932 /* Takes pointer to payload area */
1933 inline void kuc_free(void *p, int payload_len)
1935 struct kuc_hdr *lh = kuc_ptr(p);
1936 OBD_FREE(lh, kuc_len(payload_len));
1938 EXPORT_SYMBOL(kuc_free);
1940 struct obd_request_slot_waiter {
1941 struct list_head orsw_entry;
1942 wait_queue_head_t orsw_waitq;
1946 static bool obd_request_slot_avail(struct client_obd *cli,
1947 struct obd_request_slot_waiter *orsw)
1951 spin_lock(&cli->cl_loi_list_lock);
1952 avail = !!list_empty(&orsw->orsw_entry);
1953 spin_unlock(&cli->cl_loi_list_lock);
1959 * For network flow control, the RPC sponsor needs to acquire a credit
1960 * before sending the RPC. The credits count for a connection is defined
1961 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1962 * the subsequent RPC sponsors need to wait until others released their
1963 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1965 int obd_get_request_slot(struct client_obd *cli)
1967 struct obd_request_slot_waiter orsw;
1968 struct l_wait_info lwi;
1971 spin_lock(&cli->cl_loi_list_lock);
1972 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1973 cli->cl_r_in_flight++;
1974 spin_unlock(&cli->cl_loi_list_lock);
1978 init_waitqueue_head(&orsw.orsw_waitq);
1979 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1980 orsw.orsw_signaled = false;
1981 spin_unlock(&cli->cl_loi_list_lock);
1983 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1984 rc = l_wait_event(orsw.orsw_waitq,
1985 obd_request_slot_avail(cli, &orsw) ||
1989 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1990 * freed but other (such as obd_put_request_slot) is using it. */
1991 spin_lock(&cli->cl_loi_list_lock);
1993 if (!orsw.orsw_signaled) {
1994 if (list_empty(&orsw.orsw_entry))
1995 cli->cl_r_in_flight--;
1997 list_del(&orsw.orsw_entry);
2001 if (orsw.orsw_signaled) {
2002 LASSERT(list_empty(&orsw.orsw_entry));
2006 spin_unlock(&cli->cl_loi_list_lock);
2010 EXPORT_SYMBOL(obd_get_request_slot);
2012 void obd_put_request_slot(struct client_obd *cli)
2014 struct obd_request_slot_waiter *orsw;
2016 spin_lock(&cli->cl_loi_list_lock);
2017 cli->cl_r_in_flight--;
2019 /* If there is free slot, wakeup the first waiter. */
2020 if (!list_empty(&cli->cl_loi_read_list) &&
2021 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2022 orsw = list_entry(cli->cl_loi_read_list.next,
2023 struct obd_request_slot_waiter, orsw_entry);
2024 list_del_init(&orsw->orsw_entry);
2025 cli->cl_r_in_flight++;
2026 wake_up(&orsw->orsw_waitq);
2028 spin_unlock(&cli->cl_loi_list_lock);
2030 EXPORT_SYMBOL(obd_put_request_slot);
2032 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2034 return cli->cl_max_rpcs_in_flight;
2036 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2038 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2040 struct obd_request_slot_waiter *orsw;
2047 if (max > OBD_MAX_RIF_MAX || max < 1)
2050 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2051 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2052 /* adjust max_mod_rpcs_in_flight to ensure it is always
2053 * strictly lower that max_rpcs_in_flight */
2055 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2056 "because it must be higher than "
2057 "max_mod_rpcs_in_flight value",
2058 cli->cl_import->imp_obd->obd_name);
2061 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2062 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2068 spin_lock(&cli->cl_loi_list_lock);
2069 old = cli->cl_max_rpcs_in_flight;
2070 cli->cl_max_rpcs_in_flight = max;
2073 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2074 for (i = 0; i < diff; i++) {
2075 if (list_empty(&cli->cl_loi_read_list))
2078 orsw = list_entry(cli->cl_loi_read_list.next,
2079 struct obd_request_slot_waiter, orsw_entry);
2080 list_del_init(&orsw->orsw_entry);
2081 cli->cl_r_in_flight++;
2082 wake_up(&orsw->orsw_waitq);
2084 spin_unlock(&cli->cl_loi_list_lock);
2088 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2090 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2092 return cli->cl_max_mod_rpcs_in_flight;
2094 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2096 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2098 struct obd_connect_data *ocd;
2102 if (max > OBD_MAX_RIF_MAX || max < 1)
2105 /* cannot exceed or equal max_rpcs_in_flight */
2106 if (max >= cli->cl_max_rpcs_in_flight) {
2107 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2108 "higher or equal to max_rpcs_in_flight value (%u)\n",
2109 cli->cl_import->imp_obd->obd_name,
2110 max, cli->cl_max_rpcs_in_flight);
2114 /* cannot exceed max modify RPCs in flight supported by the server */
2115 ocd = &cli->cl_import->imp_connect_data;
2116 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2117 maxmodrpcs = ocd->ocd_maxmodrpcs;
2120 if (max > maxmodrpcs) {
2121 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2122 "higher than max_mod_rpcs_per_client value (%hu) "
2123 "returned by the server at connection\n",
2124 cli->cl_import->imp_obd->obd_name,
2129 spin_lock(&cli->cl_mod_rpcs_lock);
2131 prev = cli->cl_max_mod_rpcs_in_flight;
2132 cli->cl_max_mod_rpcs_in_flight = max;
2134 /* wakeup waiters if limit has been increased */
2135 if (cli->cl_max_mod_rpcs_in_flight > prev)
2136 wake_up(&cli->cl_mod_rpcs_waitq);
2138 spin_unlock(&cli->cl_mod_rpcs_lock);
2142 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2145 #define pct(a, b) (b ? a * 100 / b : 0)
2146 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2147 struct seq_file *seq)
2150 unsigned long mod_tot = 0, mod_cum;
2153 do_gettimeofday(&now);
2155 spin_lock(&cli->cl_mod_rpcs_lock);
2157 seq_printf(seq, "snapshot_time: %lu.%lu (secs.usecs)\n",
2158 now.tv_sec, now.tv_usec);
2159 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2160 cli->cl_mod_rpcs_in_flight);
2162 seq_printf(seq, "\n\t\t\tmodify\n");
2163 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2165 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2168 for (i = 0; i < OBD_HIST_MAX; i++) {
2169 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2171 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2172 i, mod, pct(mod, mod_tot),
2173 pct(mod_cum, mod_tot));
2174 if (mod_cum == mod_tot)
2178 spin_unlock(&cli->cl_mod_rpcs_lock);
2182 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2186 /* The number of modify RPCs sent in parallel is limited
2187 * because the server has a finite number of slots per client to
2188 * store request result and ensure reply reconstruction when needed.
2189 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2190 * that takes into account server limit and cl_max_rpcs_in_flight
2192 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2193 * one close request is allowed above the maximum.
2195 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2200 /* A slot is available if
2201 * - number of modify RPCs in flight is less than the max
2202 * - it's a close RPC and no other close request is in flight
2204 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2205 (close_req && cli->cl_close_rpcs_in_flight == 0);
2210 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2215 spin_lock(&cli->cl_mod_rpcs_lock);
2216 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2217 spin_unlock(&cli->cl_mod_rpcs_lock);
2221 /* Get a modify RPC slot from the obd client @cli according
2222 * to the kind of operation @opc that is going to be sent
2223 * and the intent @it of the operation if it applies.
2224 * If the maximum number of modify RPCs in flight is reached
2225 * the thread is put to sleep.
2226 * Returns the tag to be set in the request message. Tag 0
2227 * is reserved for non-modifying requests.
2229 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2230 struct lookup_intent *it)
2232 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2233 bool close_req = false;
2236 /* read-only metadata RPCs don't consume a slot on MDT
2237 * for reply reconstruction
2239 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2240 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2243 if (opc == MDS_CLOSE)
2247 spin_lock(&cli->cl_mod_rpcs_lock);
2248 max = cli->cl_max_mod_rpcs_in_flight;
2249 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2250 /* there is a slot available */
2251 cli->cl_mod_rpcs_in_flight++;
2253 cli->cl_close_rpcs_in_flight++;
2254 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2255 cli->cl_mod_rpcs_in_flight);
2256 /* find a free tag */
2257 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2259 LASSERT(i < OBD_MAX_RIF_MAX);
2260 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2261 spin_unlock(&cli->cl_mod_rpcs_lock);
2262 /* tag 0 is reserved for non-modify RPCs */
2265 spin_unlock(&cli->cl_mod_rpcs_lock);
2267 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2268 "opc %u, max %hu\n",
2269 cli->cl_import->imp_obd->obd_name, opc, max);
2271 l_wait_event(cli->cl_mod_rpcs_waitq,
2272 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2275 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2277 /* Put a modify RPC slot from the obd client @cli according
2278 * to the kind of operation @opc that has been sent and the
2279 * intent @it of the operation if it applies.
2281 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2282 struct lookup_intent *it, __u16 tag)
2284 bool close_req = false;
2286 if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2287 it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2290 if (opc == MDS_CLOSE)
2293 spin_lock(&cli->cl_mod_rpcs_lock);
2294 cli->cl_mod_rpcs_in_flight--;
2296 cli->cl_close_rpcs_in_flight--;
2297 /* release the tag in the bitmap */
2298 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2299 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2300 spin_unlock(&cli->cl_mod_rpcs_lock);
2301 wake_up(&cli->cl_mod_rpcs_waitq);
2303 EXPORT_SYMBOL(obd_put_mod_rpc_slot);