4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2016, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
30 * Lustre is a trademark of Sun Microsystems, Inc.
32 * lustre/obdclass/genops.c
34 * These are the only exported functions, they provide some generic
35 * infrastructure for managing object devices
38 #define DEBUG_SUBSYSTEM S_CLASS
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lprocfs_status.h>
44 #include <lustre_disk.h>
45 #include <lustre_kernelcomm.h>
47 static DEFINE_SPINLOCK(obd_types_lock);
48 static LIST_HEAD(obd_types);
49 DEFINE_RWLOCK(obd_dev_lock);
50 static struct obd_device *obd_devs[MAX_OBD_DEVICES];
52 static struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 static struct kmem_cache *import_cachep;
57 static LIST_HEAD(obd_zombie_imports);
58 static LIST_HEAD(obd_zombie_exports);
59 static DEFINE_SPINLOCK(obd_zombie_impexp_lock);
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks, int debug_level);
67 static LIST_HEAD(obd_stale_exports);
68 static DEFINE_SPINLOCK(obd_stale_export_lock);
69 static atomic_t obd_stale_export_num = ATOMIC_INIT(0);
71 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
72 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
75 * support functions: we could use inter-module communication, but this
76 * is more portable to other OS's
78 static struct obd_device *obd_device_alloc(void)
80 struct obd_device *obd;
82 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
84 obd->obd_magic = OBD_DEVICE_MAGIC;
89 static void obd_device_free(struct obd_device *obd)
92 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
93 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
94 if (obd->obd_namespace != NULL) {
95 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
96 obd, obd->obd_namespace, obd->obd_force);
99 lu_ref_fini(&obd->obd_reference);
100 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
103 struct obd_type *class_search_type(const char *name)
105 struct list_head *tmp;
106 struct obd_type *type;
108 spin_lock(&obd_types_lock);
109 list_for_each(tmp, &obd_types) {
110 type = list_entry(tmp, struct obd_type, typ_chain);
111 if (strcmp(type->typ_name, name) == 0) {
112 spin_unlock(&obd_types_lock);
116 spin_unlock(&obd_types_lock);
119 EXPORT_SYMBOL(class_search_type);
121 struct obd_type *class_get_type(const char *name)
123 struct obd_type *type = class_search_type(name);
125 #ifdef HAVE_MODULE_LOADING_SUPPORT
127 const char *modname = name;
129 if (strcmp(modname, "obdfilter") == 0)
132 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
133 modname = LUSTRE_OSP_NAME;
135 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
136 modname = LUSTRE_MDT_NAME;
138 if (!request_module("%s", modname)) {
139 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
140 type = class_search_type(name);
142 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
148 spin_lock(&type->obd_type_lock);
150 try_module_get(type->typ_dt_ops->o_owner);
151 spin_unlock(&type->obd_type_lock);
156 void class_put_type(struct obd_type *type)
159 spin_lock(&type->obd_type_lock);
161 module_put(type->typ_dt_ops->o_owner);
162 spin_unlock(&type->obd_type_lock);
165 #define CLASS_MAX_NAME 1024
167 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
168 bool enable_proc, struct lprocfs_vars *vars,
169 const char *name, struct lu_device_type *ldt)
171 struct obd_type *type;
176 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
178 if (class_search_type(name)) {
179 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
184 OBD_ALLOC(type, sizeof(*type));
188 OBD_ALLOC_PTR(type->typ_dt_ops);
189 OBD_ALLOC_PTR(type->typ_md_ops);
190 OBD_ALLOC(type->typ_name, strlen(name) + 1);
192 if (type->typ_dt_ops == NULL ||
193 type->typ_md_ops == NULL ||
194 type->typ_name == NULL)
197 *(type->typ_dt_ops) = *dt_ops;
198 /* md_ops is optional */
200 *(type->typ_md_ops) = *md_ops;
201 strcpy(type->typ_name, name);
202 spin_lock_init(&type->obd_type_lock);
204 #ifdef CONFIG_PROC_FS
206 type->typ_procroot = lprocfs_register(type->typ_name,
209 if (IS_ERR(type->typ_procroot)) {
210 rc = PTR_ERR(type->typ_procroot);
211 type->typ_procroot = NULL;
218 rc = lu_device_type_init(ldt);
223 spin_lock(&obd_types_lock);
224 list_add(&type->typ_chain, &obd_types);
225 spin_unlock(&obd_types_lock);
230 if (type->typ_name != NULL) {
231 #ifdef CONFIG_PROC_FS
232 if (type->typ_procroot != NULL)
233 remove_proc_subtree(type->typ_name, proc_lustre_root);
235 OBD_FREE(type->typ_name, strlen(name) + 1);
237 if (type->typ_md_ops != NULL)
238 OBD_FREE_PTR(type->typ_md_ops);
239 if (type->typ_dt_ops != NULL)
240 OBD_FREE_PTR(type->typ_dt_ops);
241 OBD_FREE(type, sizeof(*type));
244 EXPORT_SYMBOL(class_register_type);
246 int class_unregister_type(const char *name)
248 struct obd_type *type = class_search_type(name);
252 CERROR("unknown obd type\n");
256 if (type->typ_refcnt) {
257 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
258 /* This is a bad situation, let's make the best of it */
259 /* Remove ops, but leave the name for debugging */
260 OBD_FREE_PTR(type->typ_dt_ops);
261 OBD_FREE_PTR(type->typ_md_ops);
265 /* we do not use type->typ_procroot as for compatibility purposes
266 * other modules can share names (i.e. lod can use lov entry). so
267 * we can't reference pointer as it can get invalided when another
268 * module removes the entry */
269 #ifdef CONFIG_PROC_FS
270 if (type->typ_procroot != NULL)
271 remove_proc_subtree(type->typ_name, proc_lustre_root);
272 if (type->typ_procsym != NULL)
273 lprocfs_remove(&type->typ_procsym);
276 lu_device_type_fini(type->typ_lu);
278 spin_lock(&obd_types_lock);
279 list_del(&type->typ_chain);
280 spin_unlock(&obd_types_lock);
281 OBD_FREE(type->typ_name, strlen(name) + 1);
282 if (type->typ_dt_ops != NULL)
283 OBD_FREE_PTR(type->typ_dt_ops);
284 if (type->typ_md_ops != NULL)
285 OBD_FREE_PTR(type->typ_md_ops);
286 OBD_FREE(type, sizeof(*type));
288 } /* class_unregister_type */
289 EXPORT_SYMBOL(class_unregister_type);
292 * Create a new obd device.
294 * Find an empty slot in ::obd_devs[], create a new obd device in it.
296 * \param[in] type_name obd device type string.
297 * \param[in] name obd device name.
299 * \retval NULL if create fails, otherwise return the obd device
302 struct obd_device *class_newdev(const char *type_name, const char *name)
304 struct obd_device *result = NULL;
305 struct obd_device *newdev;
306 struct obd_type *type = NULL;
308 int new_obd_minor = 0;
311 if (strlen(name) >= MAX_OBD_NAME) {
312 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
313 RETURN(ERR_PTR(-EINVAL));
316 type = class_get_type(type_name);
318 CERROR("OBD: unknown type: %s\n", type_name);
319 RETURN(ERR_PTR(-ENODEV));
322 newdev = obd_device_alloc();
324 GOTO(out_type, result = ERR_PTR(-ENOMEM));
326 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
328 write_lock(&obd_dev_lock);
329 for (i = 0; i < class_devno_max(); i++) {
330 struct obd_device *obd = class_num2obd(i);
332 if (obd && (strcmp(name, obd->obd_name) == 0)) {
333 CERROR("Device %s already exists at %d, won't add\n",
336 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
337 "%p obd_magic %08x != %08x\n", result,
338 result->obd_magic, OBD_DEVICE_MAGIC);
339 LASSERTF(result->obd_minor == new_obd_minor,
340 "%p obd_minor %d != %d\n", result,
341 result->obd_minor, new_obd_minor);
343 obd_devs[result->obd_minor] = NULL;
344 result->obd_name[0]='\0';
346 result = ERR_PTR(-EEXIST);
349 if (!result && !obd) {
351 result->obd_minor = i;
353 result->obd_type = type;
354 strncpy(result->obd_name, name,
355 sizeof(result->obd_name) - 1);
356 obd_devs[i] = result;
359 write_unlock(&obd_dev_lock);
361 if (result == NULL && i >= class_devno_max()) {
362 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
364 GOTO(out, result = ERR_PTR(-EOVERFLOW));
370 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
371 result->obd_name, result);
375 obd_device_free(newdev);
377 class_put_type(type);
381 void class_release_dev(struct obd_device *obd)
383 struct obd_type *obd_type = obd->obd_type;
385 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
386 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
387 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
388 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
389 LASSERT(obd_type != NULL);
391 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
392 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
394 write_lock(&obd_dev_lock);
395 obd_devs[obd->obd_minor] = NULL;
396 write_unlock(&obd_dev_lock);
397 obd_device_free(obd);
399 class_put_type(obd_type);
402 int class_name2dev(const char *name)
409 read_lock(&obd_dev_lock);
410 for (i = 0; i < class_devno_max(); i++) {
411 struct obd_device *obd = class_num2obd(i);
413 if (obd && strcmp(name, obd->obd_name) == 0) {
414 /* Make sure we finished attaching before we give
415 out any references */
416 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
417 if (obd->obd_attached) {
418 read_unlock(&obd_dev_lock);
424 read_unlock(&obd_dev_lock);
429 struct obd_device *class_name2obd(const char *name)
431 int dev = class_name2dev(name);
433 if (dev < 0 || dev > class_devno_max())
435 return class_num2obd(dev);
437 EXPORT_SYMBOL(class_name2obd);
439 int class_uuid2dev(struct obd_uuid *uuid)
443 read_lock(&obd_dev_lock);
444 for (i = 0; i < class_devno_max(); i++) {
445 struct obd_device *obd = class_num2obd(i);
447 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
448 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
449 read_unlock(&obd_dev_lock);
453 read_unlock(&obd_dev_lock);
458 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
460 int dev = class_uuid2dev(uuid);
463 return class_num2obd(dev);
465 EXPORT_SYMBOL(class_uuid2obd);
468 * Get obd device from ::obd_devs[]
470 * \param num [in] array index
472 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
473 * otherwise return the obd device there.
475 struct obd_device *class_num2obd(int num)
477 struct obd_device *obd = NULL;
479 if (num < class_devno_max()) {
484 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
485 "%p obd_magic %08x != %08x\n",
486 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
487 LASSERTF(obd->obd_minor == num,
488 "%p obd_minor %0d != %0d\n",
489 obd, obd->obd_minor, num);
496 * Get obd devices count. Device in any
498 * \retval obd device count
500 int get_devices_count(void)
502 int index, max_index = class_devno_max(), dev_count = 0;
504 read_lock(&obd_dev_lock);
505 for (index = 0; index <= max_index; index++) {
506 struct obd_device *obd = class_num2obd(index);
510 read_unlock(&obd_dev_lock);
514 EXPORT_SYMBOL(get_devices_count);
516 void class_obd_list(void)
521 read_lock(&obd_dev_lock);
522 for (i = 0; i < class_devno_max(); i++) {
523 struct obd_device *obd = class_num2obd(i);
527 if (obd->obd_stopping)
529 else if (obd->obd_set_up)
531 else if (obd->obd_attached)
535 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
536 i, status, obd->obd_type->typ_name,
537 obd->obd_name, obd->obd_uuid.uuid,
538 atomic_read(&obd->obd_refcount));
540 read_unlock(&obd_dev_lock);
544 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
545 specified, then only the client with that uuid is returned,
546 otherwise any client connected to the tgt is returned. */
547 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
548 const char * typ_name,
549 struct obd_uuid *grp_uuid)
553 read_lock(&obd_dev_lock);
554 for (i = 0; i < class_devno_max(); i++) {
555 struct obd_device *obd = class_num2obd(i);
559 if ((strncmp(obd->obd_type->typ_name, typ_name,
560 strlen(typ_name)) == 0)) {
561 if (obd_uuid_equals(tgt_uuid,
562 &obd->u.cli.cl_target_uuid) &&
563 ((grp_uuid)? obd_uuid_equals(grp_uuid,
564 &obd->obd_uuid) : 1)) {
565 read_unlock(&obd_dev_lock);
570 read_unlock(&obd_dev_lock);
574 EXPORT_SYMBOL(class_find_client_obd);
576 /* Iterate the obd_device list looking devices have grp_uuid. Start
577 searching at *next, and if a device is found, the next index to look
578 at is saved in *next. If next is NULL, then the first matching device
579 will always be returned. */
580 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
586 else if (*next >= 0 && *next < class_devno_max())
591 read_lock(&obd_dev_lock);
592 for (; i < class_devno_max(); i++) {
593 struct obd_device *obd = class_num2obd(i);
597 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
600 read_unlock(&obd_dev_lock);
604 read_unlock(&obd_dev_lock);
608 EXPORT_SYMBOL(class_devices_in_group);
611 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
612 * adjust sptlrpc settings accordingly.
614 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
616 struct obd_device *obd;
620 LASSERT(namelen > 0);
622 read_lock(&obd_dev_lock);
623 for (i = 0; i < class_devno_max(); i++) {
624 obd = class_num2obd(i);
626 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
629 /* only notify mdc, osc, osp, lwp, mdt, ost
630 * because only these have a -sptlrpc llog */
631 type = obd->obd_type->typ_name;
632 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
633 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
634 strcmp(type, LUSTRE_OSP_NAME) != 0 &&
635 strcmp(type, LUSTRE_LWP_NAME) != 0 &&
636 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
637 strcmp(type, LUSTRE_OST_NAME) != 0)
640 if (strncmp(obd->obd_name, fsname, namelen))
643 class_incref(obd, __FUNCTION__, obd);
644 read_unlock(&obd_dev_lock);
645 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
646 sizeof(KEY_SPTLRPC_CONF),
647 KEY_SPTLRPC_CONF, 0, NULL, NULL);
649 class_decref(obd, __FUNCTION__, obd);
650 read_lock(&obd_dev_lock);
652 read_unlock(&obd_dev_lock);
655 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
657 void obd_cleanup_caches(void)
660 if (obd_device_cachep) {
661 kmem_cache_destroy(obd_device_cachep);
662 obd_device_cachep = NULL;
665 kmem_cache_destroy(obdo_cachep);
669 kmem_cache_destroy(import_cachep);
670 import_cachep = NULL;
676 int obd_init_caches(void)
681 LASSERT(obd_device_cachep == NULL);
682 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
683 sizeof(struct obd_device),
685 if (!obd_device_cachep)
686 GOTO(out, rc = -ENOMEM);
688 LASSERT(obdo_cachep == NULL);
689 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
692 GOTO(out, rc = -ENOMEM);
694 LASSERT(import_cachep == NULL);
695 import_cachep = kmem_cache_create("ll_import_cache",
696 sizeof(struct obd_import),
699 GOTO(out, rc = -ENOMEM);
703 obd_cleanup_caches();
707 /* map connection to client */
708 struct obd_export *class_conn2export(struct lustre_handle *conn)
710 struct obd_export *export;
714 CDEBUG(D_CACHE, "looking for null handle\n");
718 if (conn->cookie == -1) { /* this means assign a new connection */
719 CDEBUG(D_CACHE, "want a new connection\n");
723 CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
724 export = class_handle2object(conn->cookie, NULL);
727 EXPORT_SYMBOL(class_conn2export);
729 struct obd_device *class_exp2obd(struct obd_export *exp)
735 EXPORT_SYMBOL(class_exp2obd);
737 struct obd_device *class_conn2obd(struct lustre_handle *conn)
739 struct obd_export *export;
740 export = class_conn2export(conn);
742 struct obd_device *obd = export->exp_obd;
743 class_export_put(export);
749 struct obd_import *class_exp2cliimp(struct obd_export *exp)
751 struct obd_device *obd = exp->exp_obd;
754 return obd->u.cli.cl_import;
756 EXPORT_SYMBOL(class_exp2cliimp);
758 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
760 struct obd_device *obd = class_conn2obd(conn);
763 return obd->u.cli.cl_import;
766 /* Export management functions */
767 static void class_export_destroy(struct obd_export *exp)
769 struct obd_device *obd = exp->exp_obd;
772 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
773 LASSERT(obd != NULL);
775 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
776 exp->exp_client_uuid.uuid, obd->obd_name);
778 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
779 if (exp->exp_connection)
780 ptlrpc_put_connection_superhack(exp->exp_connection);
782 LASSERT(list_empty(&exp->exp_outstanding_replies));
783 LASSERT(list_empty(&exp->exp_uncommitted_replies));
784 LASSERT(list_empty(&exp->exp_req_replay_queue));
785 LASSERT(list_empty(&exp->exp_hp_rpcs));
786 obd_destroy_export(exp);
787 class_decref(obd, "export", exp);
789 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
793 static void export_handle_addref(void *export)
795 class_export_get(export);
798 static struct portals_handle_ops export_handle_ops = {
799 .hop_addref = export_handle_addref,
803 struct obd_export *class_export_get(struct obd_export *exp)
805 atomic_inc(&exp->exp_refcount);
806 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
807 atomic_read(&exp->exp_refcount));
810 EXPORT_SYMBOL(class_export_get);
812 void class_export_put(struct obd_export *exp)
814 LASSERT(exp != NULL);
815 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
816 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
817 atomic_read(&exp->exp_refcount) - 1);
819 if (atomic_dec_and_test(&exp->exp_refcount)) {
820 LASSERT(!list_empty(&exp->exp_obd_chain));
821 LASSERT(list_empty(&exp->exp_stale_list));
822 CDEBUG(D_IOCTL, "final put %p/%s\n",
823 exp, exp->exp_client_uuid.uuid);
825 /* release nid stat refererence */
826 lprocfs_exp_cleanup(exp);
828 obd_zombie_export_add(exp);
831 EXPORT_SYMBOL(class_export_put);
833 /* Creates a new export, adds it to the hash table, and returns a
834 * pointer to it. The refcount is 2: one for the hash reference, and
835 * one for the pointer returned by this function. */
836 struct obd_export *class_new_export(struct obd_device *obd,
837 struct obd_uuid *cluuid)
839 struct obd_export *export;
840 struct cfs_hash *hash = NULL;
844 OBD_ALLOC_PTR(export);
846 return ERR_PTR(-ENOMEM);
848 export->exp_conn_cnt = 0;
849 export->exp_lock_hash = NULL;
850 export->exp_flock_hash = NULL;
851 atomic_set(&export->exp_refcount, 2);
852 atomic_set(&export->exp_rpc_count, 0);
853 atomic_set(&export->exp_cb_count, 0);
854 atomic_set(&export->exp_locks_count, 0);
855 #if LUSTRE_TRACKS_LOCK_EXP_REFS
856 INIT_LIST_HEAD(&export->exp_locks_list);
857 spin_lock_init(&export->exp_locks_list_guard);
859 atomic_set(&export->exp_replay_count, 0);
860 export->exp_obd = obd;
861 INIT_LIST_HEAD(&export->exp_outstanding_replies);
862 spin_lock_init(&export->exp_uncommitted_replies_lock);
863 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
864 INIT_LIST_HEAD(&export->exp_req_replay_queue);
865 INIT_LIST_HEAD(&export->exp_handle.h_link);
866 INIT_LIST_HEAD(&export->exp_hp_rpcs);
867 INIT_LIST_HEAD(&export->exp_reg_rpcs);
868 class_handle_hash(&export->exp_handle, &export_handle_ops);
869 export->exp_last_request_time = cfs_time_current_sec();
870 spin_lock_init(&export->exp_lock);
871 spin_lock_init(&export->exp_rpc_lock);
872 INIT_HLIST_NODE(&export->exp_uuid_hash);
873 INIT_HLIST_NODE(&export->exp_nid_hash);
874 INIT_HLIST_NODE(&export->exp_gen_hash);
875 spin_lock_init(&export->exp_bl_list_lock);
876 INIT_LIST_HEAD(&export->exp_bl_list);
877 INIT_LIST_HEAD(&export->exp_stale_list);
879 export->exp_sp_peer = LUSTRE_SP_ANY;
880 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
881 export->exp_client_uuid = *cluuid;
882 obd_init_export(export);
884 spin_lock(&obd->obd_dev_lock);
885 /* shouldn't happen, but might race */
886 if (obd->obd_stopping)
887 GOTO(exit_unlock, rc = -ENODEV);
889 hash = cfs_hash_getref(obd->obd_uuid_hash);
891 GOTO(exit_unlock, rc = -ENODEV);
892 spin_unlock(&obd->obd_dev_lock);
894 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
895 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
897 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
898 obd->obd_name, cluuid->uuid, rc);
899 GOTO(exit_err, rc = -EALREADY);
903 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
904 spin_lock(&obd->obd_dev_lock);
905 if (obd->obd_stopping) {
906 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
907 GOTO(exit_unlock, rc = -ENODEV);
910 class_incref(obd, "export", export);
911 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
912 list_add_tail(&export->exp_obd_chain_timed,
913 &export->exp_obd->obd_exports_timed);
914 export->exp_obd->obd_num_exports++;
915 spin_unlock(&obd->obd_dev_lock);
916 cfs_hash_putref(hash);
920 spin_unlock(&obd->obd_dev_lock);
923 cfs_hash_putref(hash);
924 class_handle_unhash(&export->exp_handle);
925 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
926 obd_destroy_export(export);
927 OBD_FREE_PTR(export);
930 EXPORT_SYMBOL(class_new_export);
932 void class_unlink_export(struct obd_export *exp)
934 class_handle_unhash(&exp->exp_handle);
936 spin_lock(&exp->exp_obd->obd_dev_lock);
937 /* delete an uuid-export hashitem from hashtables */
938 if (!hlist_unhashed(&exp->exp_uuid_hash))
939 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
940 &exp->exp_client_uuid,
941 &exp->exp_uuid_hash);
943 #ifdef HAVE_SERVER_SUPPORT
944 if (!hlist_unhashed(&exp->exp_gen_hash)) {
945 struct tg_export_data *ted = &exp->exp_target_data;
946 struct cfs_hash *hash;
948 /* Because obd_gen_hash will not be released until
949 * class_cleanup(), so hash should never be NULL here */
950 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
951 LASSERT(hash != NULL);
952 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
954 cfs_hash_putref(hash);
956 #endif /* HAVE_SERVER_SUPPORT */
958 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
959 list_del_init(&exp->exp_obd_chain_timed);
960 exp->exp_obd->obd_num_exports--;
961 spin_unlock(&exp->exp_obd->obd_dev_lock);
962 atomic_inc(&obd_stale_export_num);
964 /* A reference is kept by obd_stale_exports list */
965 obd_stale_export_put(exp);
967 EXPORT_SYMBOL(class_unlink_export);
969 /* Import management functions */
970 static void class_import_destroy(struct obd_import *imp)
974 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
975 imp->imp_obd->obd_name);
977 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
979 ptlrpc_put_connection_superhack(imp->imp_connection);
981 while (!list_empty(&imp->imp_conn_list)) {
982 struct obd_import_conn *imp_conn;
984 imp_conn = list_entry(imp->imp_conn_list.next,
985 struct obd_import_conn, oic_item);
986 list_del_init(&imp_conn->oic_item);
987 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
988 OBD_FREE(imp_conn, sizeof(*imp_conn));
991 LASSERT(imp->imp_sec == NULL);
992 class_decref(imp->imp_obd, "import", imp);
993 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
997 static void import_handle_addref(void *import)
999 class_import_get(import);
1002 static struct portals_handle_ops import_handle_ops = {
1003 .hop_addref = import_handle_addref,
1007 struct obd_import *class_import_get(struct obd_import *import)
1009 atomic_inc(&import->imp_refcount);
1010 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1011 atomic_read(&import->imp_refcount),
1012 import->imp_obd->obd_name);
1015 EXPORT_SYMBOL(class_import_get);
1017 void class_import_put(struct obd_import *imp)
1021 LASSERT(list_empty(&imp->imp_zombie_chain));
1022 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1024 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1025 atomic_read(&imp->imp_refcount) - 1,
1026 imp->imp_obd->obd_name);
1028 if (atomic_dec_and_test(&imp->imp_refcount)) {
1029 CDEBUG(D_INFO, "final put import %p\n", imp);
1030 obd_zombie_import_add(imp);
1033 /* catch possible import put race */
1034 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1037 EXPORT_SYMBOL(class_import_put);
1039 static void init_imp_at(struct imp_at *at) {
1041 at_init(&at->iat_net_latency, 0, 0);
1042 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1043 /* max service estimates are tracked on the server side, so
1044 don't use the AT history here, just use the last reported
1045 val. (But keep hist for proc histogram, worst_ever) */
1046 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1051 struct obd_import *class_new_import(struct obd_device *obd)
1053 struct obd_import *imp;
1054 struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1056 OBD_ALLOC(imp, sizeof(*imp));
1060 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1061 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1062 INIT_LIST_HEAD(&imp->imp_replay_list);
1063 INIT_LIST_HEAD(&imp->imp_sending_list);
1064 INIT_LIST_HEAD(&imp->imp_delayed_list);
1065 INIT_LIST_HEAD(&imp->imp_committed_list);
1066 INIT_LIST_HEAD(&imp->imp_unreplied_list);
1067 imp->imp_known_replied_xid = 0;
1068 imp->imp_replay_cursor = &imp->imp_committed_list;
1069 spin_lock_init(&imp->imp_lock);
1070 imp->imp_last_success_conn = 0;
1071 imp->imp_state = LUSTRE_IMP_NEW;
1072 imp->imp_obd = class_incref(obd, "import", imp);
1073 mutex_init(&imp->imp_sec_mutex);
1074 init_waitqueue_head(&imp->imp_recovery_waitq);
1076 if (curr_pid_ns->child_reaper)
1077 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1079 imp->imp_sec_refpid = 1;
1081 atomic_set(&imp->imp_refcount, 2);
1082 atomic_set(&imp->imp_unregistering, 0);
1083 atomic_set(&imp->imp_inflight, 0);
1084 atomic_set(&imp->imp_replay_inflight, 0);
1085 atomic_set(&imp->imp_inval_count, 0);
1086 INIT_LIST_HEAD(&imp->imp_conn_list);
1087 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1088 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1089 init_imp_at(&imp->imp_at);
1091 /* the default magic is V2, will be used in connect RPC, and
1092 * then adjusted according to the flags in request/reply. */
1093 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1097 EXPORT_SYMBOL(class_new_import);
1099 void class_destroy_import(struct obd_import *import)
1101 LASSERT(import != NULL);
1102 LASSERT(import != LP_POISON);
1104 class_handle_unhash(&import->imp_handle);
1106 spin_lock(&import->imp_lock);
1107 import->imp_generation++;
1108 spin_unlock(&import->imp_lock);
1109 class_import_put(import);
1111 EXPORT_SYMBOL(class_destroy_import);
1113 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1115 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1117 spin_lock(&exp->exp_locks_list_guard);
1119 LASSERT(lock->l_exp_refs_nr >= 0);
1121 if (lock->l_exp_refs_target != NULL &&
1122 lock->l_exp_refs_target != exp) {
1123 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1124 exp, lock, lock->l_exp_refs_target);
1126 if ((lock->l_exp_refs_nr ++) == 0) {
1127 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1128 lock->l_exp_refs_target = exp;
1130 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1131 lock, exp, lock->l_exp_refs_nr);
1132 spin_unlock(&exp->exp_locks_list_guard);
1134 EXPORT_SYMBOL(__class_export_add_lock_ref);
1136 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1138 spin_lock(&exp->exp_locks_list_guard);
1139 LASSERT(lock->l_exp_refs_nr > 0);
1140 if (lock->l_exp_refs_target != exp) {
1141 LCONSOLE_WARN("lock %p, "
1142 "mismatching export pointers: %p, %p\n",
1143 lock, lock->l_exp_refs_target, exp);
1145 if (-- lock->l_exp_refs_nr == 0) {
1146 list_del_init(&lock->l_exp_refs_link);
1147 lock->l_exp_refs_target = NULL;
1149 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1150 lock, exp, lock->l_exp_refs_nr);
1151 spin_unlock(&exp->exp_locks_list_guard);
1153 EXPORT_SYMBOL(__class_export_del_lock_ref);
1156 /* A connection defines an export context in which preallocation can
1157 be managed. This releases the export pointer reference, and returns
1158 the export handle, so the export refcount is 1 when this function
1160 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1161 struct obd_uuid *cluuid)
1163 struct obd_export *export;
1164 LASSERT(conn != NULL);
1165 LASSERT(obd != NULL);
1166 LASSERT(cluuid != NULL);
1169 export = class_new_export(obd, cluuid);
1171 RETURN(PTR_ERR(export));
1173 conn->cookie = export->exp_handle.h_cookie;
1174 class_export_put(export);
1176 CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1177 cluuid->uuid, conn->cookie);
1180 EXPORT_SYMBOL(class_connect);
1182 /* if export is involved in recovery then clean up related things */
1183 static void class_export_recovery_cleanup(struct obd_export *exp)
1185 struct obd_device *obd = exp->exp_obd;
1187 spin_lock(&obd->obd_recovery_task_lock);
1188 if (obd->obd_recovering) {
1189 if (exp->exp_in_recovery) {
1190 spin_lock(&exp->exp_lock);
1191 exp->exp_in_recovery = 0;
1192 spin_unlock(&exp->exp_lock);
1193 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1194 atomic_dec(&obd->obd_connected_clients);
1197 /* if called during recovery then should update
1198 * obd_stale_clients counter,
1199 * lightweight exports are not counted */
1200 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1201 exp->exp_obd->obd_stale_clients++;
1203 spin_unlock(&obd->obd_recovery_task_lock);
1205 spin_lock(&exp->exp_lock);
1206 /** Cleanup req replay fields */
1207 if (exp->exp_req_replay_needed) {
1208 exp->exp_req_replay_needed = 0;
1210 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1211 atomic_dec(&obd->obd_req_replay_clients);
1214 /** Cleanup lock replay data */
1215 if (exp->exp_lock_replay_needed) {
1216 exp->exp_lock_replay_needed = 0;
1218 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1219 atomic_dec(&obd->obd_lock_replay_clients);
1221 spin_unlock(&exp->exp_lock);
1224 /* This function removes 1-3 references from the export:
1225 * 1 - for export pointer passed
1226 * and if disconnect really need
1227 * 2 - removing from hash
1228 * 3 - in client_unlink_export
1229 * The export pointer passed to this function can destroyed */
1230 int class_disconnect(struct obd_export *export)
1232 int already_disconnected;
1235 if (export == NULL) {
1236 CWARN("attempting to free NULL export %p\n", export);
1240 spin_lock(&export->exp_lock);
1241 already_disconnected = export->exp_disconnected;
1242 export->exp_disconnected = 1;
1243 /* We hold references of export for uuid hash
1244 * and nid_hash and export link at least. So
1245 * it is safe to call cfs_hash_del in there. */
1246 if (!hlist_unhashed(&export->exp_nid_hash))
1247 cfs_hash_del(export->exp_obd->obd_nid_hash,
1248 &export->exp_connection->c_peer.nid,
1249 &export->exp_nid_hash);
1250 spin_unlock(&export->exp_lock);
1252 /* class_cleanup(), abort_recovery(), and class_fail_export()
1253 * all end up in here, and if any of them race we shouldn't
1254 * call extra class_export_puts(). */
1255 if (already_disconnected) {
1256 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1257 GOTO(no_disconn, already_disconnected);
1260 CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1261 export->exp_handle.h_cookie);
1263 class_export_recovery_cleanup(export);
1264 class_unlink_export(export);
1266 class_export_put(export);
1269 EXPORT_SYMBOL(class_disconnect);
1271 /* Return non-zero for a fully connected export */
1272 int class_connected_export(struct obd_export *exp)
1277 spin_lock(&exp->exp_lock);
1278 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1279 spin_unlock(&exp->exp_lock);
1283 EXPORT_SYMBOL(class_connected_export);
1285 static void class_disconnect_export_list(struct list_head *list,
1286 enum obd_option flags)
1289 struct obd_export *exp;
1292 /* It's possible that an export may disconnect itself, but
1293 * nothing else will be added to this list. */
1294 while (!list_empty(list)) {
1295 exp = list_entry(list->next, struct obd_export,
1297 /* need for safe call CDEBUG after obd_disconnect */
1298 class_export_get(exp);
1300 spin_lock(&exp->exp_lock);
1301 exp->exp_flags = flags;
1302 spin_unlock(&exp->exp_lock);
1304 if (obd_uuid_equals(&exp->exp_client_uuid,
1305 &exp->exp_obd->obd_uuid)) {
1307 "exp %p export uuid == obd uuid, don't discon\n",
1309 /* Need to delete this now so we don't end up pointing
1310 * to work_list later when this export is cleaned up. */
1311 list_del_init(&exp->exp_obd_chain);
1312 class_export_put(exp);
1316 class_export_get(exp);
1317 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1318 "last request at %ld\n",
1319 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1320 exp, exp->exp_last_request_time);
1321 /* release one export reference anyway */
1322 rc = obd_disconnect(exp);
1324 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1325 obd_export_nid2str(exp), exp, rc);
1326 class_export_put(exp);
1331 void class_disconnect_exports(struct obd_device *obd)
1333 struct list_head work_list;
1336 /* Move all of the exports from obd_exports to a work list, en masse. */
1337 INIT_LIST_HEAD(&work_list);
1338 spin_lock(&obd->obd_dev_lock);
1339 list_splice_init(&obd->obd_exports, &work_list);
1340 list_splice_init(&obd->obd_delayed_exports, &work_list);
1341 spin_unlock(&obd->obd_dev_lock);
1343 if (!list_empty(&work_list)) {
1344 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1345 "disconnecting them\n", obd->obd_minor, obd);
1346 class_disconnect_export_list(&work_list,
1347 exp_flags_from_obd(obd));
1349 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1350 obd->obd_minor, obd);
1353 EXPORT_SYMBOL(class_disconnect_exports);
1355 /* Remove exports that have not completed recovery.
1357 void class_disconnect_stale_exports(struct obd_device *obd,
1358 int (*test_export)(struct obd_export *))
1360 struct list_head work_list;
1361 struct obd_export *exp, *n;
1365 INIT_LIST_HEAD(&work_list);
1366 spin_lock(&obd->obd_dev_lock);
1367 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1369 /* don't count self-export as client */
1370 if (obd_uuid_equals(&exp->exp_client_uuid,
1371 &exp->exp_obd->obd_uuid))
1374 /* don't evict clients which have no slot in last_rcvd
1375 * (e.g. lightweight connection) */
1376 if (exp->exp_target_data.ted_lr_idx == -1)
1379 spin_lock(&exp->exp_lock);
1380 if (exp->exp_failed || test_export(exp)) {
1381 spin_unlock(&exp->exp_lock);
1384 exp->exp_failed = 1;
1385 spin_unlock(&exp->exp_lock);
1387 list_move(&exp->exp_obd_chain, &work_list);
1389 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1390 obd->obd_name, exp->exp_client_uuid.uuid,
1391 exp->exp_connection == NULL ? "<unknown>" :
1392 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1393 print_export_data(exp, "EVICTING", 0, D_HA);
1395 spin_unlock(&obd->obd_dev_lock);
1398 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1399 obd->obd_name, evicted);
1401 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1402 OBD_OPT_ABORT_RECOV);
1405 EXPORT_SYMBOL(class_disconnect_stale_exports);
1407 void class_fail_export(struct obd_export *exp)
1409 int rc, already_failed;
1411 spin_lock(&exp->exp_lock);
1412 already_failed = exp->exp_failed;
1413 exp->exp_failed = 1;
1414 spin_unlock(&exp->exp_lock);
1416 if (already_failed) {
1417 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1418 exp, exp->exp_client_uuid.uuid);
1422 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1423 exp, exp->exp_client_uuid.uuid);
1425 if (obd_dump_on_timeout)
1426 libcfs_debug_dumplog();
1428 /* need for safe call CDEBUG after obd_disconnect */
1429 class_export_get(exp);
1431 /* Most callers into obd_disconnect are removing their own reference
1432 * (request, for example) in addition to the one from the hash table.
1433 * We don't have such a reference here, so make one. */
1434 class_export_get(exp);
1435 rc = obd_disconnect(exp);
1437 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1439 CDEBUG(D_HA, "disconnected export %p/%s\n",
1440 exp, exp->exp_client_uuid.uuid);
1441 class_export_put(exp);
1443 EXPORT_SYMBOL(class_fail_export);
1445 char *obd_export_nid2str(struct obd_export *exp)
1447 if (exp->exp_connection != NULL)
1448 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1452 EXPORT_SYMBOL(obd_export_nid2str);
1454 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1456 struct cfs_hash *nid_hash;
1457 struct obd_export *doomed_exp = NULL;
1458 int exports_evicted = 0;
1460 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1462 spin_lock(&obd->obd_dev_lock);
1463 /* umount has run already, so evict thread should leave
1464 * its task to umount thread now */
1465 if (obd->obd_stopping) {
1466 spin_unlock(&obd->obd_dev_lock);
1467 return exports_evicted;
1469 nid_hash = obd->obd_nid_hash;
1470 cfs_hash_getref(nid_hash);
1471 spin_unlock(&obd->obd_dev_lock);
1474 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1475 if (doomed_exp == NULL)
1478 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1479 "nid %s found, wanted nid %s, requested nid %s\n",
1480 obd_export_nid2str(doomed_exp),
1481 libcfs_nid2str(nid_key), nid);
1482 LASSERTF(doomed_exp != obd->obd_self_export,
1483 "self-export is hashed by NID?\n");
1485 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1486 "request\n", obd->obd_name,
1487 obd_uuid2str(&doomed_exp->exp_client_uuid),
1488 obd_export_nid2str(doomed_exp));
1489 class_fail_export(doomed_exp);
1490 class_export_put(doomed_exp);
1493 cfs_hash_putref(nid_hash);
1495 if (!exports_evicted)
1496 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1497 obd->obd_name, nid);
1498 return exports_evicted;
1500 EXPORT_SYMBOL(obd_export_evict_by_nid);
1502 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1504 struct cfs_hash *uuid_hash;
1505 struct obd_export *doomed_exp = NULL;
1506 struct obd_uuid doomed_uuid;
1507 int exports_evicted = 0;
1509 spin_lock(&obd->obd_dev_lock);
1510 if (obd->obd_stopping) {
1511 spin_unlock(&obd->obd_dev_lock);
1512 return exports_evicted;
1514 uuid_hash = obd->obd_uuid_hash;
1515 cfs_hash_getref(uuid_hash);
1516 spin_unlock(&obd->obd_dev_lock);
1518 obd_str2uuid(&doomed_uuid, uuid);
1519 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1520 CERROR("%s: can't evict myself\n", obd->obd_name);
1521 cfs_hash_putref(uuid_hash);
1522 return exports_evicted;
1525 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1527 if (doomed_exp == NULL) {
1528 CERROR("%s: can't disconnect %s: no exports found\n",
1529 obd->obd_name, uuid);
1531 CWARN("%s: evicting %s at adminstrative request\n",
1532 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1533 class_fail_export(doomed_exp);
1534 class_export_put(doomed_exp);
1537 cfs_hash_putref(uuid_hash);
1539 return exports_evicted;
1542 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1543 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1544 EXPORT_SYMBOL(class_export_dump_hook);
1547 static void print_export_data(struct obd_export *exp, const char *status,
1548 int locks, int debug_level)
1550 struct ptlrpc_reply_state *rs;
1551 struct ptlrpc_reply_state *first_reply = NULL;
1554 spin_lock(&exp->exp_lock);
1555 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1561 spin_unlock(&exp->exp_lock);
1563 CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1564 "%p %s %llu stale:%d\n",
1565 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1566 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1567 atomic_read(&exp->exp_rpc_count),
1568 atomic_read(&exp->exp_cb_count),
1569 atomic_read(&exp->exp_locks_count),
1570 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1571 nreplies, first_reply, nreplies > 3 ? "..." : "",
1572 exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1573 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1574 if (locks && class_export_dump_hook != NULL)
1575 class_export_dump_hook(exp);
1579 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1581 struct obd_export *exp;
1583 spin_lock(&obd->obd_dev_lock);
1584 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1585 print_export_data(exp, "ACTIVE", locks, debug_level);
1586 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1587 print_export_data(exp, "UNLINKED", locks, debug_level);
1588 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1589 print_export_data(exp, "DELAYED", locks, debug_level);
1590 spin_unlock(&obd->obd_dev_lock);
1591 spin_lock(&obd_zombie_impexp_lock);
1592 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1593 print_export_data(exp, "ZOMBIE", locks, debug_level);
1594 spin_unlock(&obd_zombie_impexp_lock);
1597 void obd_exports_barrier(struct obd_device *obd)
1600 LASSERT(list_empty(&obd->obd_exports));
1601 spin_lock(&obd->obd_dev_lock);
1602 while (!list_empty(&obd->obd_unlinked_exports)) {
1603 spin_unlock(&obd->obd_dev_lock);
1604 set_current_state(TASK_UNINTERRUPTIBLE);
1605 schedule_timeout(cfs_time_seconds(waited));
1606 if (waited > 5 && is_power_of_2(waited)) {
1607 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1608 "more than %d seconds. "
1609 "The obd refcount = %d. Is it stuck?\n",
1610 obd->obd_name, waited,
1611 atomic_read(&obd->obd_refcount));
1612 dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1615 spin_lock(&obd->obd_dev_lock);
1617 spin_unlock(&obd->obd_dev_lock);
1619 EXPORT_SYMBOL(obd_exports_barrier);
1621 /* Total amount of zombies to be destroyed */
1622 static int zombies_count = 0;
1625 * kill zombie imports and exports
1627 void obd_zombie_impexp_cull(void)
1629 struct obd_import *import;
1630 struct obd_export *export;
1634 spin_lock(&obd_zombie_impexp_lock);
1637 if (!list_empty(&obd_zombie_imports)) {
1638 import = list_entry(obd_zombie_imports.next,
1641 list_del_init(&import->imp_zombie_chain);
1645 if (!list_empty(&obd_zombie_exports)) {
1646 export = list_entry(obd_zombie_exports.next,
1649 list_del_init(&export->exp_obd_chain);
1652 spin_unlock(&obd_zombie_impexp_lock);
1654 if (import != NULL) {
1655 class_import_destroy(import);
1656 spin_lock(&obd_zombie_impexp_lock);
1658 spin_unlock(&obd_zombie_impexp_lock);
1661 if (export != NULL) {
1662 class_export_destroy(export);
1663 spin_lock(&obd_zombie_impexp_lock);
1665 spin_unlock(&obd_zombie_impexp_lock);
1669 } while (import != NULL || export != NULL);
1673 static DECLARE_COMPLETION(obd_zombie_start);
1674 static DECLARE_COMPLETION(obd_zombie_stop);
1675 static unsigned long obd_zombie_flags;
1676 static DECLARE_WAIT_QUEUE_HEAD(obd_zombie_waitq);
1677 static pid_t obd_zombie_pid;
1680 OBD_ZOMBIE_STOP = 0x0001,
1684 * check for work for kill zombie import/export thread.
1686 static int obd_zombie_impexp_check(void *arg)
1690 spin_lock(&obd_zombie_impexp_lock);
1691 rc = (zombies_count == 0) &&
1692 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1693 spin_unlock(&obd_zombie_impexp_lock);
1699 * Add export to the obd_zombe thread and notify it.
1701 static void obd_zombie_export_add(struct obd_export *exp) {
1702 atomic_dec(&obd_stale_export_num);
1703 spin_lock(&exp->exp_obd->obd_dev_lock);
1704 LASSERT(!list_empty(&exp->exp_obd_chain));
1705 list_del_init(&exp->exp_obd_chain);
1706 spin_unlock(&exp->exp_obd->obd_dev_lock);
1707 spin_lock(&obd_zombie_impexp_lock);
1709 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1710 spin_unlock(&obd_zombie_impexp_lock);
1712 obd_zombie_impexp_notify();
1716 * Add import to the obd_zombe thread and notify it.
1718 static void obd_zombie_import_add(struct obd_import *imp) {
1719 LASSERT(imp->imp_sec == NULL);
1720 spin_lock(&obd_zombie_impexp_lock);
1721 LASSERT(list_empty(&imp->imp_zombie_chain));
1723 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1724 spin_unlock(&obd_zombie_impexp_lock);
1726 obd_zombie_impexp_notify();
1730 * notify import/export destroy thread about new zombie.
1732 static void obd_zombie_impexp_notify(void)
1735 * Make sure obd_zomebie_impexp_thread get this notification.
1736 * It is possible this signal only get by obd_zombie_barrier, and
1737 * barrier gulps this notification and sleeps away and hangs ensues
1739 wake_up_all(&obd_zombie_waitq);
1743 * check whether obd_zombie is idle
1745 static int obd_zombie_is_idle(void)
1749 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1750 spin_lock(&obd_zombie_impexp_lock);
1751 rc = (zombies_count == 0);
1752 spin_unlock(&obd_zombie_impexp_lock);
1757 * wait when obd_zombie import/export queues become empty
1759 void obd_zombie_barrier(void)
1761 struct l_wait_info lwi = { 0 };
1763 if (obd_zombie_pid == current_pid())
1764 /* don't wait for myself */
1766 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1768 EXPORT_SYMBOL(obd_zombie_barrier);
1771 struct obd_export *obd_stale_export_get(void)
1773 struct obd_export *exp = NULL;
1776 spin_lock(&obd_stale_export_lock);
1777 if (!list_empty(&obd_stale_exports)) {
1778 exp = list_entry(obd_stale_exports.next,
1779 struct obd_export, exp_stale_list);
1780 list_del_init(&exp->exp_stale_list);
1782 spin_unlock(&obd_stale_export_lock);
1785 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1786 atomic_read(&obd_stale_export_num));
1790 EXPORT_SYMBOL(obd_stale_export_get);
1792 void obd_stale_export_put(struct obd_export *exp)
1796 LASSERT(list_empty(&exp->exp_stale_list));
1797 if (exp->exp_lock_hash &&
1798 atomic_read(&exp->exp_lock_hash->hs_count)) {
1799 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1800 atomic_read(&obd_stale_export_num));
1802 spin_lock_bh(&exp->exp_bl_list_lock);
1803 spin_lock(&obd_stale_export_lock);
1804 /* Add to the tail if there is no blocked locks,
1805 * to the head otherwise. */
1806 if (list_empty(&exp->exp_bl_list))
1807 list_add_tail(&exp->exp_stale_list,
1808 &obd_stale_exports);
1810 list_add(&exp->exp_stale_list,
1811 &obd_stale_exports);
1813 spin_unlock(&obd_stale_export_lock);
1814 spin_unlock_bh(&exp->exp_bl_list_lock);
1816 class_export_put(exp);
1820 EXPORT_SYMBOL(obd_stale_export_put);
1823 * Adjust the position of the export in the stale list,
1824 * i.e. move to the head of the list if is needed.
1826 void obd_stale_export_adjust(struct obd_export *exp)
1828 LASSERT(exp != NULL);
1829 spin_lock_bh(&exp->exp_bl_list_lock);
1830 spin_lock(&obd_stale_export_lock);
1832 if (!list_empty(&exp->exp_stale_list) &&
1833 !list_empty(&exp->exp_bl_list))
1834 list_move(&exp->exp_stale_list, &obd_stale_exports);
1836 spin_unlock(&obd_stale_export_lock);
1837 spin_unlock_bh(&exp->exp_bl_list_lock);
1839 EXPORT_SYMBOL(obd_stale_export_adjust);
1842 * destroy zombie export/import thread.
1844 static int obd_zombie_impexp_thread(void *unused)
1846 unshare_fs_struct();
1847 complete(&obd_zombie_start);
1849 obd_zombie_pid = current_pid();
1851 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1852 struct l_wait_info lwi = { 0 };
1854 l_wait_event(obd_zombie_waitq,
1855 !obd_zombie_impexp_check(NULL), &lwi);
1856 obd_zombie_impexp_cull();
1859 * Notify obd_zombie_barrier callers that queues
1862 wake_up(&obd_zombie_waitq);
1865 complete(&obd_zombie_stop);
1872 * start destroy zombie import/export thread
1874 int obd_zombie_impexp_init(void)
1876 struct task_struct *task;
1878 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1880 RETURN(PTR_ERR(task));
1882 wait_for_completion(&obd_zombie_start);
1886 * stop destroy zombie import/export thread
1888 void obd_zombie_impexp_stop(void)
1890 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1891 obd_zombie_impexp_notify();
1892 wait_for_completion(&obd_zombie_stop);
1893 LASSERT(list_empty(&obd_stale_exports));
1896 /***** Kernel-userspace comm helpers *******/
1898 /* Get length of entire message, including header */
1899 int kuc_len(int payload_len)
1901 return sizeof(struct kuc_hdr) + payload_len;
1903 EXPORT_SYMBOL(kuc_len);
1905 /* Get a pointer to kuc header, given a ptr to the payload
1906 * @param p Pointer to payload area
1907 * @returns Pointer to kuc header
1909 struct kuc_hdr * kuc_ptr(void *p)
1911 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1912 LASSERT(lh->kuc_magic == KUC_MAGIC);
1915 EXPORT_SYMBOL(kuc_ptr);
1917 /* Alloc space for a message, and fill in header
1918 * @return Pointer to payload area
1920 void *kuc_alloc(int payload_len, int transport, int type)
1923 int len = kuc_len(payload_len);
1927 return ERR_PTR(-ENOMEM);
1929 lh->kuc_magic = KUC_MAGIC;
1930 lh->kuc_transport = transport;
1931 lh->kuc_msgtype = type;
1932 lh->kuc_msglen = len;
1934 return (void *)(lh + 1);
1936 EXPORT_SYMBOL(kuc_alloc);
1938 /* Takes pointer to payload area */
1939 void kuc_free(void *p, int payload_len)
1941 struct kuc_hdr *lh = kuc_ptr(p);
1942 OBD_FREE(lh, kuc_len(payload_len));
1944 EXPORT_SYMBOL(kuc_free);
1946 struct obd_request_slot_waiter {
1947 struct list_head orsw_entry;
1948 wait_queue_head_t orsw_waitq;
1952 static bool obd_request_slot_avail(struct client_obd *cli,
1953 struct obd_request_slot_waiter *orsw)
1957 spin_lock(&cli->cl_loi_list_lock);
1958 avail = !!list_empty(&orsw->orsw_entry);
1959 spin_unlock(&cli->cl_loi_list_lock);
1965 * For network flow control, the RPC sponsor needs to acquire a credit
1966 * before sending the RPC. The credits count for a connection is defined
1967 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1968 * the subsequent RPC sponsors need to wait until others released their
1969 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1971 int obd_get_request_slot(struct client_obd *cli)
1973 struct obd_request_slot_waiter orsw;
1974 struct l_wait_info lwi;
1977 spin_lock(&cli->cl_loi_list_lock);
1978 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1979 cli->cl_r_in_flight++;
1980 spin_unlock(&cli->cl_loi_list_lock);
1984 init_waitqueue_head(&orsw.orsw_waitq);
1985 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1986 orsw.orsw_signaled = false;
1987 spin_unlock(&cli->cl_loi_list_lock);
1989 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1990 rc = l_wait_event(orsw.orsw_waitq,
1991 obd_request_slot_avail(cli, &orsw) ||
1995 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1996 * freed but other (such as obd_put_request_slot) is using it. */
1997 spin_lock(&cli->cl_loi_list_lock);
1999 if (!orsw.orsw_signaled) {
2000 if (list_empty(&orsw.orsw_entry))
2001 cli->cl_r_in_flight--;
2003 list_del(&orsw.orsw_entry);
2007 if (orsw.orsw_signaled) {
2008 LASSERT(list_empty(&orsw.orsw_entry));
2012 spin_unlock(&cli->cl_loi_list_lock);
2016 EXPORT_SYMBOL(obd_get_request_slot);
2018 void obd_put_request_slot(struct client_obd *cli)
2020 struct obd_request_slot_waiter *orsw;
2022 spin_lock(&cli->cl_loi_list_lock);
2023 cli->cl_r_in_flight--;
2025 /* If there is free slot, wakeup the first waiter. */
2026 if (!list_empty(&cli->cl_loi_read_list) &&
2027 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2028 orsw = list_entry(cli->cl_loi_read_list.next,
2029 struct obd_request_slot_waiter, orsw_entry);
2030 list_del_init(&orsw->orsw_entry);
2031 cli->cl_r_in_flight++;
2032 wake_up(&orsw->orsw_waitq);
2034 spin_unlock(&cli->cl_loi_list_lock);
2036 EXPORT_SYMBOL(obd_put_request_slot);
2038 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2040 return cli->cl_max_rpcs_in_flight;
2042 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2044 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2046 struct obd_request_slot_waiter *orsw;
2053 if (max > OBD_MAX_RIF_MAX || max < 1)
2056 typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2057 if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2058 /* adjust max_mod_rpcs_in_flight to ensure it is always
2059 * strictly lower that max_rpcs_in_flight */
2061 CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2062 "because it must be higher than "
2063 "max_mod_rpcs_in_flight value",
2064 cli->cl_import->imp_obd->obd_name);
2067 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2068 rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2074 spin_lock(&cli->cl_loi_list_lock);
2075 old = cli->cl_max_rpcs_in_flight;
2076 cli->cl_max_rpcs_in_flight = max;
2079 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2080 for (i = 0; i < diff; i++) {
2081 if (list_empty(&cli->cl_loi_read_list))
2084 orsw = list_entry(cli->cl_loi_read_list.next,
2085 struct obd_request_slot_waiter, orsw_entry);
2086 list_del_init(&orsw->orsw_entry);
2087 cli->cl_r_in_flight++;
2088 wake_up(&orsw->orsw_waitq);
2090 spin_unlock(&cli->cl_loi_list_lock);
2094 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2096 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2098 return cli->cl_max_mod_rpcs_in_flight;
2100 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2102 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2104 struct obd_connect_data *ocd;
2108 if (max > OBD_MAX_RIF_MAX || max < 1)
2111 /* cannot exceed or equal max_rpcs_in_flight */
2112 if (max >= cli->cl_max_rpcs_in_flight) {
2113 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2114 "higher or equal to max_rpcs_in_flight value (%u)\n",
2115 cli->cl_import->imp_obd->obd_name,
2116 max, cli->cl_max_rpcs_in_flight);
2120 /* cannot exceed max modify RPCs in flight supported by the server */
2121 ocd = &cli->cl_import->imp_connect_data;
2122 if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2123 maxmodrpcs = ocd->ocd_maxmodrpcs;
2126 if (max > maxmodrpcs) {
2127 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2128 "higher than max_mod_rpcs_per_client value (%hu) "
2129 "returned by the server at connection\n",
2130 cli->cl_import->imp_obd->obd_name,
2135 spin_lock(&cli->cl_mod_rpcs_lock);
2137 prev = cli->cl_max_mod_rpcs_in_flight;
2138 cli->cl_max_mod_rpcs_in_flight = max;
2140 /* wakeup waiters if limit has been increased */
2141 if (cli->cl_max_mod_rpcs_in_flight > prev)
2142 wake_up(&cli->cl_mod_rpcs_waitq);
2144 spin_unlock(&cli->cl_mod_rpcs_lock);
2148 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2151 #define pct(a, b) (b ? a * 100 / b : 0)
2152 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2153 struct seq_file *seq)
2155 unsigned long mod_tot = 0, mod_cum;
2156 struct timespec64 now;
2159 ktime_get_real_ts64(&now);
2161 spin_lock(&cli->cl_mod_rpcs_lock);
2163 seq_printf(seq, "snapshot_time: %llu.%9lu (secs.nsecs)\n",
2164 (s64)now.tv_sec, now.tv_nsec);
2165 seq_printf(seq, "modify_RPCs_in_flight: %hu\n",
2166 cli->cl_mod_rpcs_in_flight);
2168 seq_printf(seq, "\n\t\t\tmodify\n");
2169 seq_printf(seq, "rpcs in flight rpcs %% cum %%\n");
2171 mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2174 for (i = 0; i < OBD_HIST_MAX; i++) {
2175 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2177 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2178 i, mod, pct(mod, mod_tot),
2179 pct(mod_cum, mod_tot));
2180 if (mod_cum == mod_tot)
2184 spin_unlock(&cli->cl_mod_rpcs_lock);
2188 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2192 /* The number of modify RPCs sent in parallel is limited
2193 * because the server has a finite number of slots per client to
2194 * store request result and ensure reply reconstruction when needed.
2195 * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2196 * that takes into account server limit and cl_max_rpcs_in_flight
2198 * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2199 * one close request is allowed above the maximum.
2201 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2206 /* A slot is available if
2207 * - number of modify RPCs in flight is less than the max
2208 * - it's a close RPC and no other close request is in flight
2210 avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2211 (close_req && cli->cl_close_rpcs_in_flight == 0);
2216 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2221 spin_lock(&cli->cl_mod_rpcs_lock);
2222 avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2223 spin_unlock(&cli->cl_mod_rpcs_lock);
2227 static inline bool obd_skip_mod_rpc_slot(const struct lookup_intent *it)
2230 (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2231 it->it_op == IT_READDIR ||
2232 (it->it_op == IT_LAYOUT && !(it->it_flags & FMODE_WRITE))))
2237 /* Get a modify RPC slot from the obd client @cli according
2238 * to the kind of operation @opc that is going to be sent
2239 * and the intent @it of the operation if it applies.
2240 * If the maximum number of modify RPCs in flight is reached
2241 * the thread is put to sleep.
2242 * Returns the tag to be set in the request message. Tag 0
2243 * is reserved for non-modifying requests.
2245 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2246 struct lookup_intent *it)
2248 struct l_wait_info lwi = LWI_INTR(NULL, NULL);
2249 bool close_req = false;
2252 /* read-only metadata RPCs don't consume a slot on MDT
2253 * for reply reconstruction
2255 if (obd_skip_mod_rpc_slot(it))
2258 if (opc == MDS_CLOSE)
2262 spin_lock(&cli->cl_mod_rpcs_lock);
2263 max = cli->cl_max_mod_rpcs_in_flight;
2264 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2265 /* there is a slot available */
2266 cli->cl_mod_rpcs_in_flight++;
2268 cli->cl_close_rpcs_in_flight++;
2269 lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2270 cli->cl_mod_rpcs_in_flight);
2271 /* find a free tag */
2272 i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2274 LASSERT(i < OBD_MAX_RIF_MAX);
2275 LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2276 spin_unlock(&cli->cl_mod_rpcs_lock);
2277 /* tag 0 is reserved for non-modify RPCs */
2280 spin_unlock(&cli->cl_mod_rpcs_lock);
2282 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2283 "opc %u, max %hu\n",
2284 cli->cl_import->imp_obd->obd_name, opc, max);
2286 l_wait_event(cli->cl_mod_rpcs_waitq,
2287 obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2290 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2292 /* Put a modify RPC slot from the obd client @cli according
2293 * to the kind of operation @opc that has been sent and the
2294 * intent @it of the operation if it applies.
2296 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2297 struct lookup_intent *it, __u16 tag)
2299 bool close_req = false;
2301 if (obd_skip_mod_rpc_slot(it))
2304 if (opc == MDS_CLOSE)
2307 spin_lock(&cli->cl_mod_rpcs_lock);
2308 cli->cl_mod_rpcs_in_flight--;
2310 cli->cl_close_rpcs_in_flight--;
2311 /* release the tag in the bitmap */
2312 LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2313 LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2314 spin_unlock(&cli->cl_mod_rpcs_lock);
2315 wake_up(&cli->cl_mod_rpcs_waitq);
2317 EXPORT_SYMBOL(obd_put_mod_rpc_slot);