4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2014, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #include <obd_class.h>
44 #include <lprocfs_status.h>
46 spinlock_t obd_types_lock;
48 static struct kmem_cache *obd_device_cachep;
49 struct kmem_cache *obdo_cachep;
50 EXPORT_SYMBOL(obdo_cachep);
51 static struct kmem_cache *import_cachep;
53 static struct list_head obd_zombie_imports;
54 static struct list_head obd_zombie_exports;
55 static spinlock_t obd_zombie_impexp_lock;
57 static void obd_zombie_impexp_notify(void);
58 static void obd_zombie_export_add(struct obd_export *exp);
59 static void obd_zombie_import_add(struct obd_import *imp);
60 static void print_export_data(struct obd_export *exp,
61 const char *status, int locks);
63 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
64 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
70 static struct obd_device *obd_device_alloc(void)
72 struct obd_device *obd;
74 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
76 obd->obd_magic = OBD_DEVICE_MAGIC;
81 static void obd_device_free(struct obd_device *obd)
84 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
85 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
86 if (obd->obd_namespace != NULL) {
87 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
88 obd, obd->obd_namespace, obd->obd_force);
91 lu_ref_fini(&obd->obd_reference);
92 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
95 struct obd_type *class_search_type(const char *name)
97 struct list_head *tmp;
98 struct obd_type *type;
100 spin_lock(&obd_types_lock);
101 list_for_each(tmp, &obd_types) {
102 type = list_entry(tmp, struct obd_type, typ_chain);
103 if (strcmp(type->typ_name, name) == 0) {
104 spin_unlock(&obd_types_lock);
108 spin_unlock(&obd_types_lock);
111 EXPORT_SYMBOL(class_search_type);
113 struct obd_type *class_get_type(const char *name)
115 struct obd_type *type = class_search_type(name);
117 #ifdef HAVE_MODULE_LOADING_SUPPORT
119 const char *modname = name;
121 if (strcmp(modname, "obdfilter") == 0)
124 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
125 modname = LUSTRE_OSP_NAME;
127 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
128 modname = LUSTRE_MDT_NAME;
130 if (!request_module("%s", modname)) {
131 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
132 type = class_search_type(name);
134 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
140 spin_lock(&type->obd_type_lock);
142 try_module_get(type->typ_dt_ops->o_owner);
143 spin_unlock(&type->obd_type_lock);
147 EXPORT_SYMBOL(class_get_type);
149 void class_put_type(struct obd_type *type)
152 spin_lock(&type->obd_type_lock);
154 module_put(type->typ_dt_ops->o_owner);
155 spin_unlock(&type->obd_type_lock);
157 EXPORT_SYMBOL(class_put_type);
159 #define CLASS_MAX_NAME 1024
161 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
162 bool enable_proc, struct lprocfs_vars *vars,
163 const char *name, struct lu_device_type *ldt)
165 struct obd_type *type;
170 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
172 if (class_search_type(name)) {
173 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
178 OBD_ALLOC(type, sizeof(*type));
182 OBD_ALLOC_PTR(type->typ_dt_ops);
183 OBD_ALLOC_PTR(type->typ_md_ops);
184 OBD_ALLOC(type->typ_name, strlen(name) + 1);
186 if (type->typ_dt_ops == NULL ||
187 type->typ_md_ops == NULL ||
188 type->typ_name == NULL)
191 *(type->typ_dt_ops) = *dt_ops;
192 /* md_ops is optional */
194 *(type->typ_md_ops) = *md_ops;
195 strcpy(type->typ_name, name);
196 spin_lock_init(&type->obd_type_lock);
198 #ifdef CONFIG_PROC_FS
200 type->typ_procroot = lprocfs_register(type->typ_name,
203 if (IS_ERR(type->typ_procroot)) {
204 rc = PTR_ERR(type->typ_procroot);
205 type->typ_procroot = NULL;
212 rc = lu_device_type_init(ldt);
217 spin_lock(&obd_types_lock);
218 list_add(&type->typ_chain, &obd_types);
219 spin_unlock(&obd_types_lock);
224 if (type->typ_name != NULL) {
225 #ifdef CONFIG_PROC_FS
226 if (type->typ_procroot != NULL)
227 remove_proc_subtree(type->typ_name, proc_lustre_root);
229 OBD_FREE(type->typ_name, strlen(name) + 1);
231 if (type->typ_md_ops != NULL)
232 OBD_FREE_PTR(type->typ_md_ops);
233 if (type->typ_dt_ops != NULL)
234 OBD_FREE_PTR(type->typ_dt_ops);
235 OBD_FREE(type, sizeof(*type));
238 EXPORT_SYMBOL(class_register_type);
240 int class_unregister_type(const char *name)
242 struct obd_type *type = class_search_type(name);
246 CERROR("unknown obd type\n");
250 if (type->typ_refcnt) {
251 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
252 /* This is a bad situation, let's make the best of it */
253 /* Remove ops, but leave the name for debugging */
254 OBD_FREE_PTR(type->typ_dt_ops);
255 OBD_FREE_PTR(type->typ_md_ops);
259 /* we do not use type->typ_procroot as for compatibility purposes
260 * other modules can share names (i.e. lod can use lov entry). so
261 * we can't reference pointer as it can get invalided when another
262 * module removes the entry */
263 #ifdef CONFIG_PROC_FS
264 if (type->typ_procroot != NULL)
265 remove_proc_subtree(type->typ_name, proc_lustre_root);
266 if (type->typ_procsym != NULL)
267 lprocfs_remove(&type->typ_procsym);
270 lu_device_type_fini(type->typ_lu);
272 spin_lock(&obd_types_lock);
273 list_del(&type->typ_chain);
274 spin_unlock(&obd_types_lock);
275 OBD_FREE(type->typ_name, strlen(name) + 1);
276 if (type->typ_dt_ops != NULL)
277 OBD_FREE_PTR(type->typ_dt_ops);
278 if (type->typ_md_ops != NULL)
279 OBD_FREE_PTR(type->typ_md_ops);
280 OBD_FREE(type, sizeof(*type));
282 } /* class_unregister_type */
283 EXPORT_SYMBOL(class_unregister_type);
286 * Create a new obd device.
288 * Find an empty slot in ::obd_devs[], create a new obd device in it.
290 * \param[in] type_name obd device type string.
291 * \param[in] name obd device name.
293 * \retval NULL if create fails, otherwise return the obd device
296 struct obd_device *class_newdev(const char *type_name, const char *name)
298 struct obd_device *result = NULL;
299 struct obd_device *newdev;
300 struct obd_type *type = NULL;
302 int new_obd_minor = 0;
305 if (strlen(name) >= MAX_OBD_NAME) {
306 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
307 RETURN(ERR_PTR(-EINVAL));
310 type = class_get_type(type_name);
312 CERROR("OBD: unknown type: %s\n", type_name);
313 RETURN(ERR_PTR(-ENODEV));
316 newdev = obd_device_alloc();
318 GOTO(out_type, result = ERR_PTR(-ENOMEM));
320 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
322 write_lock(&obd_dev_lock);
323 for (i = 0; i < class_devno_max(); i++) {
324 struct obd_device *obd = class_num2obd(i);
326 if (obd && (strcmp(name, obd->obd_name) == 0)) {
327 CERROR("Device %s already exists at %d, won't add\n",
330 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
331 "%p obd_magic %08x != %08x\n", result,
332 result->obd_magic, OBD_DEVICE_MAGIC);
333 LASSERTF(result->obd_minor == new_obd_minor,
334 "%p obd_minor %d != %d\n", result,
335 result->obd_minor, new_obd_minor);
337 obd_devs[result->obd_minor] = NULL;
338 result->obd_name[0]='\0';
340 result = ERR_PTR(-EEXIST);
343 if (!result && !obd) {
345 result->obd_minor = i;
347 result->obd_type = type;
348 strncpy(result->obd_name, name,
349 sizeof(result->obd_name) - 1);
350 obd_devs[i] = result;
353 write_unlock(&obd_dev_lock);
355 if (result == NULL && i >= class_devno_max()) {
356 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
358 GOTO(out, result = ERR_PTR(-EOVERFLOW));
364 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
365 result->obd_name, result);
369 obd_device_free(newdev);
371 class_put_type(type);
375 void class_release_dev(struct obd_device *obd)
377 struct obd_type *obd_type = obd->obd_type;
379 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
380 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
381 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
382 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
383 LASSERT(obd_type != NULL);
385 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
386 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
388 write_lock(&obd_dev_lock);
389 obd_devs[obd->obd_minor] = NULL;
390 write_unlock(&obd_dev_lock);
391 obd_device_free(obd);
393 class_put_type(obd_type);
396 int class_name2dev(const char *name)
403 read_lock(&obd_dev_lock);
404 for (i = 0; i < class_devno_max(); i++) {
405 struct obd_device *obd = class_num2obd(i);
407 if (obd && strcmp(name, obd->obd_name) == 0) {
408 /* Make sure we finished attaching before we give
409 out any references */
410 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
411 if (obd->obd_attached) {
412 read_unlock(&obd_dev_lock);
418 read_unlock(&obd_dev_lock);
422 EXPORT_SYMBOL(class_name2dev);
424 struct obd_device *class_name2obd(const char *name)
426 int dev = class_name2dev(name);
428 if (dev < 0 || dev > class_devno_max())
430 return class_num2obd(dev);
432 EXPORT_SYMBOL(class_name2obd);
434 int class_uuid2dev(struct obd_uuid *uuid)
438 read_lock(&obd_dev_lock);
439 for (i = 0; i < class_devno_max(); i++) {
440 struct obd_device *obd = class_num2obd(i);
442 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
443 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
444 read_unlock(&obd_dev_lock);
448 read_unlock(&obd_dev_lock);
452 EXPORT_SYMBOL(class_uuid2dev);
454 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
456 int dev = class_uuid2dev(uuid);
459 return class_num2obd(dev);
461 EXPORT_SYMBOL(class_uuid2obd);
464 * Get obd device from ::obd_devs[]
466 * \param num [in] array index
468 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
469 * otherwise return the obd device there.
471 struct obd_device *class_num2obd(int num)
473 struct obd_device *obd = NULL;
475 if (num < class_devno_max()) {
480 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
481 "%p obd_magic %08x != %08x\n",
482 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
483 LASSERTF(obd->obd_minor == num,
484 "%p obd_minor %0d != %0d\n",
485 obd, obd->obd_minor, num);
490 EXPORT_SYMBOL(class_num2obd);
493 * Get obd devices count. Device in any
495 * \retval obd device count
497 int get_devices_count(void)
499 int index, max_index = class_devno_max(), dev_count = 0;
501 read_lock(&obd_dev_lock);
502 for (index = 0; index <= max_index; index++) {
503 struct obd_device *obd = class_num2obd(index);
507 read_unlock(&obd_dev_lock);
511 EXPORT_SYMBOL(get_devices_count);
513 void class_obd_list(void)
518 read_lock(&obd_dev_lock);
519 for (i = 0; i < class_devno_max(); i++) {
520 struct obd_device *obd = class_num2obd(i);
524 if (obd->obd_stopping)
526 else if (obd->obd_set_up)
528 else if (obd->obd_attached)
532 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
533 i, status, obd->obd_type->typ_name,
534 obd->obd_name, obd->obd_uuid.uuid,
535 atomic_read(&obd->obd_refcount));
537 read_unlock(&obd_dev_lock);
541 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
542 specified, then only the client with that uuid is returned,
543 otherwise any client connected to the tgt is returned. */
544 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
545 const char * typ_name,
546 struct obd_uuid *grp_uuid)
550 read_lock(&obd_dev_lock);
551 for (i = 0; i < class_devno_max(); i++) {
552 struct obd_device *obd = class_num2obd(i);
556 if ((strncmp(obd->obd_type->typ_name, typ_name,
557 strlen(typ_name)) == 0)) {
558 if (obd_uuid_equals(tgt_uuid,
559 &obd->u.cli.cl_target_uuid) &&
560 ((grp_uuid)? obd_uuid_equals(grp_uuid,
561 &obd->obd_uuid) : 1)) {
562 read_unlock(&obd_dev_lock);
567 read_unlock(&obd_dev_lock);
571 EXPORT_SYMBOL(class_find_client_obd);
573 /* Iterate the obd_device list looking devices have grp_uuid. Start
574 searching at *next, and if a device is found, the next index to look
575 at is saved in *next. If next is NULL, then the first matching device
576 will always be returned. */
577 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
583 else if (*next >= 0 && *next < class_devno_max())
588 read_lock(&obd_dev_lock);
589 for (; i < class_devno_max(); i++) {
590 struct obd_device *obd = class_num2obd(i);
594 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
597 read_unlock(&obd_dev_lock);
601 read_unlock(&obd_dev_lock);
605 EXPORT_SYMBOL(class_devices_in_group);
608 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
609 * adjust sptlrpc settings accordingly.
611 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
613 struct obd_device *obd;
617 LASSERT(namelen > 0);
619 read_lock(&obd_dev_lock);
620 for (i = 0; i < class_devno_max(); i++) {
621 obd = class_num2obd(i);
623 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
626 /* only notify mdc, osc, mdt, ost */
627 type = obd->obd_type->typ_name;
628 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
629 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
630 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
631 strcmp(type, LUSTRE_OST_NAME) != 0)
634 if (strncmp(obd->obd_name, fsname, namelen))
637 class_incref(obd, __FUNCTION__, obd);
638 read_unlock(&obd_dev_lock);
639 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
640 sizeof(KEY_SPTLRPC_CONF),
641 KEY_SPTLRPC_CONF, 0, NULL, NULL);
643 class_decref(obd, __FUNCTION__, obd);
644 read_lock(&obd_dev_lock);
646 read_unlock(&obd_dev_lock);
649 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
651 void obd_cleanup_caches(void)
654 if (obd_device_cachep) {
655 kmem_cache_destroy(obd_device_cachep);
656 obd_device_cachep = NULL;
659 kmem_cache_destroy(obdo_cachep);
663 kmem_cache_destroy(import_cachep);
664 import_cachep = NULL;
667 kmem_cache_destroy(capa_cachep);
673 int obd_init_caches(void)
678 LASSERT(obd_device_cachep == NULL);
679 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
680 sizeof(struct obd_device),
682 if (!obd_device_cachep)
683 GOTO(out, rc = -ENOMEM);
685 LASSERT(obdo_cachep == NULL);
686 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
689 GOTO(out, rc = -ENOMEM);
691 LASSERT(import_cachep == NULL);
692 import_cachep = kmem_cache_create("ll_import_cache",
693 sizeof(struct obd_import),
696 GOTO(out, rc = -ENOMEM);
698 LASSERT(capa_cachep == NULL);
699 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
702 GOTO(out, rc = -ENOMEM);
706 obd_cleanup_caches();
710 /* map connection to client */
711 struct obd_export *class_conn2export(struct lustre_handle *conn)
713 struct obd_export *export;
717 CDEBUG(D_CACHE, "looking for null handle\n");
721 if (conn->cookie == -1) { /* this means assign a new connection */
722 CDEBUG(D_CACHE, "want a new connection\n");
726 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
727 export = class_handle2object(conn->cookie, NULL);
730 EXPORT_SYMBOL(class_conn2export);
732 struct obd_device *class_exp2obd(struct obd_export *exp)
738 EXPORT_SYMBOL(class_exp2obd);
740 struct obd_device *class_conn2obd(struct lustre_handle *conn)
742 struct obd_export *export;
743 export = class_conn2export(conn);
745 struct obd_device *obd = export->exp_obd;
746 class_export_put(export);
751 EXPORT_SYMBOL(class_conn2obd);
753 struct obd_import *class_exp2cliimp(struct obd_export *exp)
755 struct obd_device *obd = exp->exp_obd;
758 return obd->u.cli.cl_import;
760 EXPORT_SYMBOL(class_exp2cliimp);
762 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
764 struct obd_device *obd = class_conn2obd(conn);
767 return obd->u.cli.cl_import;
769 EXPORT_SYMBOL(class_conn2cliimp);
771 /* Export management functions */
772 static void class_export_destroy(struct obd_export *exp)
774 struct obd_device *obd = exp->exp_obd;
777 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
778 LASSERT(obd != NULL);
780 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
781 exp->exp_client_uuid.uuid, obd->obd_name);
783 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
784 if (exp->exp_connection)
785 ptlrpc_put_connection_superhack(exp->exp_connection);
787 LASSERT(list_empty(&exp->exp_outstanding_replies));
788 LASSERT(list_empty(&exp->exp_uncommitted_replies));
789 LASSERT(list_empty(&exp->exp_req_replay_queue));
790 LASSERT(list_empty(&exp->exp_hp_rpcs));
791 obd_destroy_export(exp);
792 class_decref(obd, "export", exp);
794 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
798 static void export_handle_addref(void *export)
800 class_export_get(export);
803 static struct portals_handle_ops export_handle_ops = {
804 .hop_addref = export_handle_addref,
808 struct obd_export *class_export_get(struct obd_export *exp)
810 atomic_inc(&exp->exp_refcount);
811 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
812 atomic_read(&exp->exp_refcount));
815 EXPORT_SYMBOL(class_export_get);
817 void class_export_put(struct obd_export *exp)
819 LASSERT(exp != NULL);
820 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
821 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
822 atomic_read(&exp->exp_refcount) - 1);
824 if (atomic_dec_and_test(&exp->exp_refcount)) {
825 LASSERT(!list_empty(&exp->exp_obd_chain));
826 CDEBUG(D_IOCTL, "final put %p/%s\n",
827 exp, exp->exp_client_uuid.uuid);
829 /* release nid stat refererence */
830 lprocfs_exp_cleanup(exp);
832 obd_zombie_export_add(exp);
835 EXPORT_SYMBOL(class_export_put);
837 /* Creates a new export, adds it to the hash table, and returns a
838 * pointer to it. The refcount is 2: one for the hash reference, and
839 * one for the pointer returned by this function. */
840 struct obd_export *class_new_export(struct obd_device *obd,
841 struct obd_uuid *cluuid)
843 struct obd_export *export;
844 cfs_hash_t *hash = NULL;
848 OBD_ALLOC_PTR(export);
850 return ERR_PTR(-ENOMEM);
852 export->exp_conn_cnt = 0;
853 export->exp_lock_hash = NULL;
854 export->exp_flock_hash = NULL;
855 atomic_set(&export->exp_refcount, 2);
856 atomic_set(&export->exp_rpc_count, 0);
857 atomic_set(&export->exp_cb_count, 0);
858 atomic_set(&export->exp_locks_count, 0);
859 #if LUSTRE_TRACKS_LOCK_EXP_REFS
860 INIT_LIST_HEAD(&export->exp_locks_list);
861 spin_lock_init(&export->exp_locks_list_guard);
863 atomic_set(&export->exp_replay_count, 0);
864 export->exp_obd = obd;
865 INIT_LIST_HEAD(&export->exp_outstanding_replies);
866 spin_lock_init(&export->exp_uncommitted_replies_lock);
867 INIT_LIST_HEAD(&export->exp_uncommitted_replies);
868 INIT_LIST_HEAD(&export->exp_req_replay_queue);
869 INIT_LIST_HEAD(&export->exp_handle.h_link);
870 INIT_LIST_HEAD(&export->exp_hp_rpcs);
871 INIT_LIST_HEAD(&export->exp_reg_rpcs);
872 class_handle_hash(&export->exp_handle, &export_handle_ops);
873 export->exp_last_request_time = cfs_time_current_sec();
874 spin_lock_init(&export->exp_lock);
875 spin_lock_init(&export->exp_rpc_lock);
876 INIT_HLIST_NODE(&export->exp_uuid_hash);
877 INIT_HLIST_NODE(&export->exp_nid_hash);
878 spin_lock_init(&export->exp_bl_list_lock);
879 INIT_LIST_HEAD(&export->exp_bl_list);
881 export->exp_sp_peer = LUSTRE_SP_ANY;
882 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
883 export->exp_client_uuid = *cluuid;
884 obd_init_export(export);
886 spin_lock(&obd->obd_dev_lock);
887 /* shouldn't happen, but might race */
888 if (obd->obd_stopping)
889 GOTO(exit_unlock, rc = -ENODEV);
891 hash = cfs_hash_getref(obd->obd_uuid_hash);
893 GOTO(exit_unlock, rc = -ENODEV);
894 spin_unlock(&obd->obd_dev_lock);
896 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
897 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
899 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
900 obd->obd_name, cluuid->uuid, rc);
901 GOTO(exit_err, rc = -EALREADY);
905 at_init(&export->exp_bl_lock_at, obd_timeout, 0);
906 spin_lock(&obd->obd_dev_lock);
907 if (obd->obd_stopping) {
908 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
909 GOTO(exit_unlock, rc = -ENODEV);
912 class_incref(obd, "export", export);
913 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
914 list_add_tail(&export->exp_obd_chain_timed,
915 &export->exp_obd->obd_exports_timed);
916 export->exp_obd->obd_num_exports++;
917 spin_unlock(&obd->obd_dev_lock);
918 cfs_hash_putref(hash);
922 spin_unlock(&obd->obd_dev_lock);
925 cfs_hash_putref(hash);
926 class_handle_unhash(&export->exp_handle);
927 LASSERT(hlist_unhashed(&export->exp_uuid_hash));
928 obd_destroy_export(export);
929 OBD_FREE_PTR(export);
932 EXPORT_SYMBOL(class_new_export);
934 void class_unlink_export(struct obd_export *exp)
936 class_handle_unhash(&exp->exp_handle);
938 spin_lock(&exp->exp_obd->obd_dev_lock);
939 /* delete an uuid-export hashitem from hashtables */
940 if (!hlist_unhashed(&exp->exp_uuid_hash))
941 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
942 &exp->exp_client_uuid,
943 &exp->exp_uuid_hash);
945 list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
946 list_del_init(&exp->exp_obd_chain_timed);
947 exp->exp_obd->obd_num_exports--;
948 spin_unlock(&exp->exp_obd->obd_dev_lock);
949 class_export_put(exp);
951 EXPORT_SYMBOL(class_unlink_export);
953 /* Import management functions */
954 static void class_import_destroy(struct obd_import *imp)
958 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
959 imp->imp_obd->obd_name);
961 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
963 ptlrpc_put_connection_superhack(imp->imp_connection);
965 while (!list_empty(&imp->imp_conn_list)) {
966 struct obd_import_conn *imp_conn;
968 imp_conn = list_entry(imp->imp_conn_list.next,
969 struct obd_import_conn, oic_item);
970 list_del_init(&imp_conn->oic_item);
971 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
972 OBD_FREE(imp_conn, sizeof(*imp_conn));
975 LASSERT(imp->imp_sec == NULL);
976 class_decref(imp->imp_obd, "import", imp);
977 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
981 static void import_handle_addref(void *import)
983 class_import_get(import);
986 static struct portals_handle_ops import_handle_ops = {
987 .hop_addref = import_handle_addref,
991 struct obd_import *class_import_get(struct obd_import *import)
993 atomic_inc(&import->imp_refcount);
994 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
995 atomic_read(&import->imp_refcount),
996 import->imp_obd->obd_name);
999 EXPORT_SYMBOL(class_import_get);
1001 void class_import_put(struct obd_import *imp)
1005 LASSERT(list_empty(&imp->imp_zombie_chain));
1006 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1008 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1009 atomic_read(&imp->imp_refcount) - 1,
1010 imp->imp_obd->obd_name);
1012 if (atomic_dec_and_test(&imp->imp_refcount)) {
1013 CDEBUG(D_INFO, "final put import %p\n", imp);
1014 obd_zombie_import_add(imp);
1017 /* catch possible import put race */
1018 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1021 EXPORT_SYMBOL(class_import_put);
1023 static void init_imp_at(struct imp_at *at) {
1025 at_init(&at->iat_net_latency, 0, 0);
1026 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1027 /* max service estimates are tracked on the server side, so
1028 don't use the AT history here, just use the last reported
1029 val. (But keep hist for proc histogram, worst_ever) */
1030 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1035 struct obd_import *class_new_import(struct obd_device *obd)
1037 struct obd_import *imp;
1039 OBD_ALLOC(imp, sizeof(*imp));
1043 INIT_LIST_HEAD(&imp->imp_pinger_chain);
1044 INIT_LIST_HEAD(&imp->imp_zombie_chain);
1045 INIT_LIST_HEAD(&imp->imp_replay_list);
1046 INIT_LIST_HEAD(&imp->imp_sending_list);
1047 INIT_LIST_HEAD(&imp->imp_delayed_list);
1048 INIT_LIST_HEAD(&imp->imp_committed_list);
1049 imp->imp_replay_cursor = &imp->imp_committed_list;
1050 spin_lock_init(&imp->imp_lock);
1051 imp->imp_last_success_conn = 0;
1052 imp->imp_state = LUSTRE_IMP_NEW;
1053 imp->imp_obd = class_incref(obd, "import", imp);
1054 mutex_init(&imp->imp_sec_mutex);
1055 init_waitqueue_head(&imp->imp_recovery_waitq);
1057 atomic_set(&imp->imp_refcount, 2);
1058 atomic_set(&imp->imp_unregistering, 0);
1059 atomic_set(&imp->imp_inflight, 0);
1060 atomic_set(&imp->imp_replay_inflight, 0);
1061 atomic_set(&imp->imp_inval_count, 0);
1062 INIT_LIST_HEAD(&imp->imp_conn_list);
1063 INIT_LIST_HEAD(&imp->imp_handle.h_link);
1064 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1065 init_imp_at(&imp->imp_at);
1067 /* the default magic is V2, will be used in connect RPC, and
1068 * then adjusted according to the flags in request/reply. */
1069 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1073 EXPORT_SYMBOL(class_new_import);
1075 void class_destroy_import(struct obd_import *import)
1077 LASSERT(import != NULL);
1078 LASSERT(import != LP_POISON);
1080 class_handle_unhash(&import->imp_handle);
1082 spin_lock(&import->imp_lock);
1083 import->imp_generation++;
1084 spin_unlock(&import->imp_lock);
1085 class_import_put(import);
1087 EXPORT_SYMBOL(class_destroy_import);
1089 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1091 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1093 spin_lock(&exp->exp_locks_list_guard);
1095 LASSERT(lock->l_exp_refs_nr >= 0);
1097 if (lock->l_exp_refs_target != NULL &&
1098 lock->l_exp_refs_target != exp) {
1099 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1100 exp, lock, lock->l_exp_refs_target);
1102 if ((lock->l_exp_refs_nr ++) == 0) {
1103 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1104 lock->l_exp_refs_target = exp;
1106 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1107 lock, exp, lock->l_exp_refs_nr);
1108 spin_unlock(&exp->exp_locks_list_guard);
1110 EXPORT_SYMBOL(__class_export_add_lock_ref);
1112 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1114 spin_lock(&exp->exp_locks_list_guard);
1115 LASSERT(lock->l_exp_refs_nr > 0);
1116 if (lock->l_exp_refs_target != exp) {
1117 LCONSOLE_WARN("lock %p, "
1118 "mismatching export pointers: %p, %p\n",
1119 lock, lock->l_exp_refs_target, exp);
1121 if (-- lock->l_exp_refs_nr == 0) {
1122 list_del_init(&lock->l_exp_refs_link);
1123 lock->l_exp_refs_target = NULL;
1125 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1126 lock, exp, lock->l_exp_refs_nr);
1127 spin_unlock(&exp->exp_locks_list_guard);
1129 EXPORT_SYMBOL(__class_export_del_lock_ref);
1132 /* A connection defines an export context in which preallocation can
1133 be managed. This releases the export pointer reference, and returns
1134 the export handle, so the export refcount is 1 when this function
1136 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1137 struct obd_uuid *cluuid)
1139 struct obd_export *export;
1140 LASSERT(conn != NULL);
1141 LASSERT(obd != NULL);
1142 LASSERT(cluuid != NULL);
1145 export = class_new_export(obd, cluuid);
1147 RETURN(PTR_ERR(export));
1149 conn->cookie = export->exp_handle.h_cookie;
1150 class_export_put(export);
1152 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1153 cluuid->uuid, conn->cookie);
1156 EXPORT_SYMBOL(class_connect);
1158 /* if export is involved in recovery then clean up related things */
1159 static void class_export_recovery_cleanup(struct obd_export *exp)
1161 struct obd_device *obd = exp->exp_obd;
1163 spin_lock(&obd->obd_recovery_task_lock);
1164 if (obd->obd_recovering) {
1165 if (exp->exp_in_recovery) {
1166 spin_lock(&exp->exp_lock);
1167 exp->exp_in_recovery = 0;
1168 spin_unlock(&exp->exp_lock);
1169 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1170 atomic_dec(&obd->obd_connected_clients);
1173 /* if called during recovery then should update
1174 * obd_stale_clients counter,
1175 * lightweight exports are not counted */
1176 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1177 exp->exp_obd->obd_stale_clients++;
1179 spin_unlock(&obd->obd_recovery_task_lock);
1181 spin_lock(&exp->exp_lock);
1182 /** Cleanup req replay fields */
1183 if (exp->exp_req_replay_needed) {
1184 exp->exp_req_replay_needed = 0;
1186 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1187 atomic_dec(&obd->obd_req_replay_clients);
1190 /** Cleanup lock replay data */
1191 if (exp->exp_lock_replay_needed) {
1192 exp->exp_lock_replay_needed = 0;
1194 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1195 atomic_dec(&obd->obd_lock_replay_clients);
1197 spin_unlock(&exp->exp_lock);
1200 /* This function removes 1-3 references from the export:
1201 * 1 - for export pointer passed
1202 * and if disconnect really need
1203 * 2 - removing from hash
1204 * 3 - in client_unlink_export
1205 * The export pointer passed to this function can destroyed */
1206 int class_disconnect(struct obd_export *export)
1208 int already_disconnected;
1211 if (export == NULL) {
1212 CWARN("attempting to free NULL export %p\n", export);
1216 spin_lock(&export->exp_lock);
1217 already_disconnected = export->exp_disconnected;
1218 export->exp_disconnected = 1;
1219 spin_unlock(&export->exp_lock);
1221 /* class_cleanup(), abort_recovery(), and class_fail_export()
1222 * all end up in here, and if any of them race we shouldn't
1223 * call extra class_export_puts(). */
1224 if (already_disconnected) {
1225 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1226 GOTO(no_disconn, already_disconnected);
1229 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1230 export->exp_handle.h_cookie);
1232 if (!hlist_unhashed(&export->exp_nid_hash))
1233 cfs_hash_del(export->exp_obd->obd_nid_hash,
1234 &export->exp_connection->c_peer.nid,
1235 &export->exp_nid_hash);
1237 class_export_recovery_cleanup(export);
1238 class_unlink_export(export);
1240 class_export_put(export);
1243 EXPORT_SYMBOL(class_disconnect);
1245 /* Return non-zero for a fully connected export */
1246 int class_connected_export(struct obd_export *exp)
1251 spin_lock(&exp->exp_lock);
1252 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1253 spin_unlock(&exp->exp_lock);
1257 EXPORT_SYMBOL(class_connected_export);
1259 static void class_disconnect_export_list(struct list_head *list,
1260 enum obd_option flags)
1263 struct obd_export *exp;
1266 /* It's possible that an export may disconnect itself, but
1267 * nothing else will be added to this list. */
1268 while (!list_empty(list)) {
1269 exp = list_entry(list->next, struct obd_export,
1271 /* need for safe call CDEBUG after obd_disconnect */
1272 class_export_get(exp);
1274 spin_lock(&exp->exp_lock);
1275 exp->exp_flags = flags;
1276 spin_unlock(&exp->exp_lock);
1278 if (obd_uuid_equals(&exp->exp_client_uuid,
1279 &exp->exp_obd->obd_uuid)) {
1281 "exp %p export uuid == obd uuid, don't discon\n",
1283 /* Need to delete this now so we don't end up pointing
1284 * to work_list later when this export is cleaned up. */
1285 list_del_init(&exp->exp_obd_chain);
1286 class_export_put(exp);
1290 class_export_get(exp);
1291 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1292 "last request at "CFS_TIME_T"\n",
1293 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1294 exp, exp->exp_last_request_time);
1295 /* release one export reference anyway */
1296 rc = obd_disconnect(exp);
1298 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1299 obd_export_nid2str(exp), exp, rc);
1300 class_export_put(exp);
1305 void class_disconnect_exports(struct obd_device *obd)
1307 struct list_head work_list;
1310 /* Move all of the exports from obd_exports to a work list, en masse. */
1311 INIT_LIST_HEAD(&work_list);
1312 spin_lock(&obd->obd_dev_lock);
1313 list_splice_init(&obd->obd_exports, &work_list);
1314 list_splice_init(&obd->obd_delayed_exports, &work_list);
1315 spin_unlock(&obd->obd_dev_lock);
1317 if (!list_empty(&work_list)) {
1318 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1319 "disconnecting them\n", obd->obd_minor, obd);
1320 class_disconnect_export_list(&work_list,
1321 exp_flags_from_obd(obd));
1323 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1324 obd->obd_minor, obd);
1327 EXPORT_SYMBOL(class_disconnect_exports);
1329 /* Remove exports that have not completed recovery.
1331 void class_disconnect_stale_exports(struct obd_device *obd,
1332 int (*test_export)(struct obd_export *))
1334 struct list_head work_list;
1335 struct obd_export *exp, *n;
1339 INIT_LIST_HEAD(&work_list);
1340 spin_lock(&obd->obd_dev_lock);
1341 list_for_each_entry_safe(exp, n, &obd->obd_exports,
1343 /* don't count self-export as client */
1344 if (obd_uuid_equals(&exp->exp_client_uuid,
1345 &exp->exp_obd->obd_uuid))
1348 /* don't evict clients which have no slot in last_rcvd
1349 * (e.g. lightweight connection) */
1350 if (exp->exp_target_data.ted_lr_idx == -1)
1353 spin_lock(&exp->exp_lock);
1354 if (exp->exp_failed || test_export(exp)) {
1355 spin_unlock(&exp->exp_lock);
1358 exp->exp_failed = 1;
1359 spin_unlock(&exp->exp_lock);
1361 list_move(&exp->exp_obd_chain, &work_list);
1363 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1364 obd->obd_name, exp->exp_client_uuid.uuid,
1365 exp->exp_connection == NULL ? "<unknown>" :
1366 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1367 print_export_data(exp, "EVICTING", 0);
1369 spin_unlock(&obd->obd_dev_lock);
1372 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1373 obd->obd_name, evicted);
1375 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1376 OBD_OPT_ABORT_RECOV);
1379 EXPORT_SYMBOL(class_disconnect_stale_exports);
1381 void class_fail_export(struct obd_export *exp)
1383 int rc, already_failed;
1385 spin_lock(&exp->exp_lock);
1386 already_failed = exp->exp_failed;
1387 exp->exp_failed = 1;
1388 spin_unlock(&exp->exp_lock);
1390 if (already_failed) {
1391 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1392 exp, exp->exp_client_uuid.uuid);
1396 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1397 exp, exp->exp_client_uuid.uuid);
1399 if (obd_dump_on_timeout)
1400 libcfs_debug_dumplog();
1402 /* need for safe call CDEBUG after obd_disconnect */
1403 class_export_get(exp);
1405 /* Most callers into obd_disconnect are removing their own reference
1406 * (request, for example) in addition to the one from the hash table.
1407 * We don't have such a reference here, so make one. */
1408 class_export_get(exp);
1409 rc = obd_disconnect(exp);
1411 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1413 CDEBUG(D_HA, "disconnected export %p/%s\n",
1414 exp, exp->exp_client_uuid.uuid);
1415 class_export_put(exp);
1417 EXPORT_SYMBOL(class_fail_export);
1419 char *obd_export_nid2str(struct obd_export *exp)
1421 if (exp->exp_connection != NULL)
1422 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1426 EXPORT_SYMBOL(obd_export_nid2str);
1428 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1430 cfs_hash_t *nid_hash;
1431 struct obd_export *doomed_exp = NULL;
1432 int exports_evicted = 0;
1434 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1436 spin_lock(&obd->obd_dev_lock);
1437 /* umount has run already, so evict thread should leave
1438 * its task to umount thread now */
1439 if (obd->obd_stopping) {
1440 spin_unlock(&obd->obd_dev_lock);
1441 return exports_evicted;
1443 nid_hash = obd->obd_nid_hash;
1444 cfs_hash_getref(nid_hash);
1445 spin_unlock(&obd->obd_dev_lock);
1448 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1449 if (doomed_exp == NULL)
1452 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1453 "nid %s found, wanted nid %s, requested nid %s\n",
1454 obd_export_nid2str(doomed_exp),
1455 libcfs_nid2str(nid_key), nid);
1456 LASSERTF(doomed_exp != obd->obd_self_export,
1457 "self-export is hashed by NID?\n");
1459 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1460 "request\n", obd->obd_name,
1461 obd_uuid2str(&doomed_exp->exp_client_uuid),
1462 obd_export_nid2str(doomed_exp));
1463 class_fail_export(doomed_exp);
1464 class_export_put(doomed_exp);
1467 cfs_hash_putref(nid_hash);
1469 if (!exports_evicted)
1470 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1471 obd->obd_name, nid);
1472 return exports_evicted;
1474 EXPORT_SYMBOL(obd_export_evict_by_nid);
1476 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1478 cfs_hash_t *uuid_hash;
1479 struct obd_export *doomed_exp = NULL;
1480 struct obd_uuid doomed_uuid;
1481 int exports_evicted = 0;
1483 spin_lock(&obd->obd_dev_lock);
1484 if (obd->obd_stopping) {
1485 spin_unlock(&obd->obd_dev_lock);
1486 return exports_evicted;
1488 uuid_hash = obd->obd_uuid_hash;
1489 cfs_hash_getref(uuid_hash);
1490 spin_unlock(&obd->obd_dev_lock);
1492 obd_str2uuid(&doomed_uuid, uuid);
1493 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1494 CERROR("%s: can't evict myself\n", obd->obd_name);
1495 cfs_hash_putref(uuid_hash);
1496 return exports_evicted;
1499 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1501 if (doomed_exp == NULL) {
1502 CERROR("%s: can't disconnect %s: no exports found\n",
1503 obd->obd_name, uuid);
1505 CWARN("%s: evicting %s at adminstrative request\n",
1506 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1507 class_fail_export(doomed_exp);
1508 class_export_put(doomed_exp);
1511 cfs_hash_putref(uuid_hash);
1513 return exports_evicted;
1515 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1517 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1518 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1519 EXPORT_SYMBOL(class_export_dump_hook);
1522 static void print_export_data(struct obd_export *exp, const char *status,
1525 struct ptlrpc_reply_state *rs;
1526 struct ptlrpc_reply_state *first_reply = NULL;
1529 spin_lock(&exp->exp_lock);
1530 list_for_each_entry(rs, &exp->exp_outstanding_replies,
1536 spin_unlock(&exp->exp_lock);
1538 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1539 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1540 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1541 atomic_read(&exp->exp_rpc_count),
1542 atomic_read(&exp->exp_cb_count),
1543 atomic_read(&exp->exp_locks_count),
1544 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1545 nreplies, first_reply, nreplies > 3 ? "..." : "",
1546 exp->exp_last_committed);
1547 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1548 if (locks && class_export_dump_hook != NULL)
1549 class_export_dump_hook(exp);
1553 void dump_exports(struct obd_device *obd, int locks)
1555 struct obd_export *exp;
1557 spin_lock(&obd->obd_dev_lock);
1558 list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1559 print_export_data(exp, "ACTIVE", locks);
1560 list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1561 print_export_data(exp, "UNLINKED", locks);
1562 list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1563 print_export_data(exp, "DELAYED", locks);
1564 spin_unlock(&obd->obd_dev_lock);
1565 spin_lock(&obd_zombie_impexp_lock);
1566 list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1567 print_export_data(exp, "ZOMBIE", locks);
1568 spin_unlock(&obd_zombie_impexp_lock);
1570 EXPORT_SYMBOL(dump_exports);
1572 void obd_exports_barrier(struct obd_device *obd)
1575 LASSERT(list_empty(&obd->obd_exports));
1576 spin_lock(&obd->obd_dev_lock);
1577 while (!list_empty(&obd->obd_unlinked_exports)) {
1578 spin_unlock(&obd->obd_dev_lock);
1579 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1580 cfs_time_seconds(waited));
1581 if (waited > 5 && IS_PO2(waited)) {
1582 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1583 "more than %d seconds. "
1584 "The obd refcount = %d. Is it stuck?\n",
1585 obd->obd_name, waited,
1586 atomic_read(&obd->obd_refcount));
1587 dump_exports(obd, 1);
1590 spin_lock(&obd->obd_dev_lock);
1592 spin_unlock(&obd->obd_dev_lock);
1594 EXPORT_SYMBOL(obd_exports_barrier);
1596 /* Total amount of zombies to be destroyed */
1597 static int zombies_count = 0;
1600 * kill zombie imports and exports
1602 void obd_zombie_impexp_cull(void)
1604 struct obd_import *import;
1605 struct obd_export *export;
1609 spin_lock(&obd_zombie_impexp_lock);
1612 if (!list_empty(&obd_zombie_imports)) {
1613 import = list_entry(obd_zombie_imports.next,
1616 list_del_init(&import->imp_zombie_chain);
1620 if (!list_empty(&obd_zombie_exports)) {
1621 export = list_entry(obd_zombie_exports.next,
1624 list_del_init(&export->exp_obd_chain);
1627 spin_unlock(&obd_zombie_impexp_lock);
1629 if (import != NULL) {
1630 class_import_destroy(import);
1631 spin_lock(&obd_zombie_impexp_lock);
1633 spin_unlock(&obd_zombie_impexp_lock);
1636 if (export != NULL) {
1637 class_export_destroy(export);
1638 spin_lock(&obd_zombie_impexp_lock);
1640 spin_unlock(&obd_zombie_impexp_lock);
1644 } while (import != NULL || export != NULL);
1648 static struct completion obd_zombie_start;
1649 static struct completion obd_zombie_stop;
1650 static unsigned long obd_zombie_flags;
1651 static wait_queue_head_t obd_zombie_waitq;
1652 static pid_t obd_zombie_pid;
1655 OBD_ZOMBIE_STOP = 0x0001,
1659 * check for work for kill zombie import/export thread.
1661 static int obd_zombie_impexp_check(void *arg)
1665 spin_lock(&obd_zombie_impexp_lock);
1666 rc = (zombies_count == 0) &&
1667 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1668 spin_unlock(&obd_zombie_impexp_lock);
1674 * Add export to the obd_zombe thread and notify it.
1676 static void obd_zombie_export_add(struct obd_export *exp) {
1677 spin_lock(&exp->exp_obd->obd_dev_lock);
1678 LASSERT(!list_empty(&exp->exp_obd_chain));
1679 list_del_init(&exp->exp_obd_chain);
1680 spin_unlock(&exp->exp_obd->obd_dev_lock);
1681 spin_lock(&obd_zombie_impexp_lock);
1683 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1684 spin_unlock(&obd_zombie_impexp_lock);
1686 obd_zombie_impexp_notify();
1690 * Add import to the obd_zombe thread and notify it.
1692 static void obd_zombie_import_add(struct obd_import *imp) {
1693 LASSERT(imp->imp_sec == NULL);
1694 LASSERT(imp->imp_rq_pool == NULL);
1695 spin_lock(&obd_zombie_impexp_lock);
1696 LASSERT(list_empty(&imp->imp_zombie_chain));
1698 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1699 spin_unlock(&obd_zombie_impexp_lock);
1701 obd_zombie_impexp_notify();
1705 * notify import/export destroy thread about new zombie.
1707 static void obd_zombie_impexp_notify(void)
1710 * Make sure obd_zomebie_impexp_thread get this notification.
1711 * It is possible this signal only get by obd_zombie_barrier, and
1712 * barrier gulps this notification and sleeps away and hangs ensues
1714 wake_up_all(&obd_zombie_waitq);
1718 * check whether obd_zombie is idle
1720 static int obd_zombie_is_idle(void)
1724 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1725 spin_lock(&obd_zombie_impexp_lock);
1726 rc = (zombies_count == 0);
1727 spin_unlock(&obd_zombie_impexp_lock);
1732 * wait when obd_zombie import/export queues become empty
1734 void obd_zombie_barrier(void)
1736 struct l_wait_info lwi = { 0 };
1738 if (obd_zombie_pid == current_pid())
1739 /* don't wait for myself */
1741 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1743 EXPORT_SYMBOL(obd_zombie_barrier);
1747 * destroy zombie export/import thread.
1749 static int obd_zombie_impexp_thread(void *unused)
1751 unshare_fs_struct();
1752 complete(&obd_zombie_start);
1754 obd_zombie_pid = current_pid();
1756 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1757 struct l_wait_info lwi = { 0 };
1759 l_wait_event(obd_zombie_waitq,
1760 !obd_zombie_impexp_check(NULL), &lwi);
1761 obd_zombie_impexp_cull();
1764 * Notify obd_zombie_barrier callers that queues
1767 wake_up(&obd_zombie_waitq);
1770 complete(&obd_zombie_stop);
1777 * start destroy zombie import/export thread
1779 int obd_zombie_impexp_init(void)
1781 struct task_struct *task;
1783 INIT_LIST_HEAD(&obd_zombie_imports);
1785 INIT_LIST_HEAD(&obd_zombie_exports);
1786 spin_lock_init(&obd_zombie_impexp_lock);
1787 init_completion(&obd_zombie_start);
1788 init_completion(&obd_zombie_stop);
1789 init_waitqueue_head(&obd_zombie_waitq);
1792 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1794 RETURN(PTR_ERR(task));
1796 wait_for_completion(&obd_zombie_start);
1800 * stop destroy zombie import/export thread
1802 void obd_zombie_impexp_stop(void)
1804 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1805 obd_zombie_impexp_notify();
1806 wait_for_completion(&obd_zombie_stop);
1809 /***** Kernel-userspace comm helpers *******/
1811 /* Get length of entire message, including header */
1812 int kuc_len(int payload_len)
1814 return sizeof(struct kuc_hdr) + payload_len;
1816 EXPORT_SYMBOL(kuc_len);
1818 /* Get a pointer to kuc header, given a ptr to the payload
1819 * @param p Pointer to payload area
1820 * @returns Pointer to kuc header
1822 struct kuc_hdr * kuc_ptr(void *p)
1824 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1825 LASSERT(lh->kuc_magic == KUC_MAGIC);
1828 EXPORT_SYMBOL(kuc_ptr);
1830 /* Test if payload is part of kuc message
1831 * @param p Pointer to payload area
1834 int kuc_ispayload(void *p)
1836 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1838 if (kh->kuc_magic == KUC_MAGIC)
1843 EXPORT_SYMBOL(kuc_ispayload);
1845 /* Alloc space for a message, and fill in header
1846 * @return Pointer to payload area
1848 void *kuc_alloc(int payload_len, int transport, int type)
1851 int len = kuc_len(payload_len);
1855 return ERR_PTR(-ENOMEM);
1857 lh->kuc_magic = KUC_MAGIC;
1858 lh->kuc_transport = transport;
1859 lh->kuc_msgtype = type;
1860 lh->kuc_msglen = len;
1862 return (void *)(lh + 1);
1864 EXPORT_SYMBOL(kuc_alloc);
1866 /* Takes pointer to payload area */
1867 inline void kuc_free(void *p, int payload_len)
1869 struct kuc_hdr *lh = kuc_ptr(p);
1870 OBD_FREE(lh, kuc_len(payload_len));
1872 EXPORT_SYMBOL(kuc_free);
1874 struct obd_request_slot_waiter {
1875 struct list_head orsw_entry;
1876 wait_queue_head_t orsw_waitq;
1880 static bool obd_request_slot_avail(struct client_obd *cli,
1881 struct obd_request_slot_waiter *orsw)
1885 spin_lock(&cli->cl_loi_list_lock);
1886 avail = !!list_empty(&orsw->orsw_entry);
1887 spin_unlock(&cli->cl_loi_list_lock);
1893 * For network flow control, the RPC sponsor needs to acquire a credit
1894 * before sending the RPC. The credits count for a connection is defined
1895 * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1896 * the subsequent RPC sponsors need to wait until others released their
1897 * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1899 int obd_get_request_slot(struct client_obd *cli)
1901 struct obd_request_slot_waiter orsw;
1902 struct l_wait_info lwi;
1905 spin_lock(&cli->cl_loi_list_lock);
1906 if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1907 cli->cl_r_in_flight++;
1908 spin_unlock(&cli->cl_loi_list_lock);
1912 init_waitqueue_head(&orsw.orsw_waitq);
1913 list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1914 orsw.orsw_signaled = false;
1915 spin_unlock(&cli->cl_loi_list_lock);
1917 lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1918 rc = l_wait_event(orsw.orsw_waitq,
1919 obd_request_slot_avail(cli, &orsw) ||
1923 /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1924 * freed but other (such as obd_put_request_slot) is using it. */
1925 spin_lock(&cli->cl_loi_list_lock);
1927 if (!orsw.orsw_signaled) {
1928 if (list_empty(&orsw.orsw_entry))
1929 cli->cl_r_in_flight--;
1931 list_del(&orsw.orsw_entry);
1935 if (orsw.orsw_signaled) {
1936 LASSERT(list_empty(&orsw.orsw_entry));
1940 spin_unlock(&cli->cl_loi_list_lock);
1944 EXPORT_SYMBOL(obd_get_request_slot);
1946 void obd_put_request_slot(struct client_obd *cli)
1948 struct obd_request_slot_waiter *orsw;
1950 spin_lock(&cli->cl_loi_list_lock);
1951 cli->cl_r_in_flight--;
1953 /* If there is free slot, wakeup the first waiter. */
1954 if (!list_empty(&cli->cl_loi_read_list) &&
1955 likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
1956 orsw = list_entry(cli->cl_loi_read_list.next,
1957 struct obd_request_slot_waiter, orsw_entry);
1958 list_del_init(&orsw->orsw_entry);
1959 cli->cl_r_in_flight++;
1960 wake_up(&orsw->orsw_waitq);
1962 spin_unlock(&cli->cl_loi_list_lock);
1964 EXPORT_SYMBOL(obd_put_request_slot);
1966 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
1968 return cli->cl_max_rpcs_in_flight;
1970 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
1972 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
1974 struct obd_request_slot_waiter *orsw;
1979 if (max > OBD_MAX_RIF_MAX || max < 1)
1982 spin_lock(&cli->cl_loi_list_lock);
1983 old = cli->cl_max_rpcs_in_flight;
1984 cli->cl_max_rpcs_in_flight = max;
1987 /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
1988 for (i = 0; i < diff; i++) {
1989 if (list_empty(&cli->cl_loi_read_list))
1992 orsw = list_entry(cli->cl_loi_read_list.next,
1993 struct obd_request_slot_waiter, orsw_entry);
1994 list_del_init(&orsw->orsw_entry);
1995 cli->cl_r_in_flight++;
1996 wake_up(&orsw->orsw_waitq);
1998 spin_unlock(&cli->cl_loi_list_lock);
2002 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);