4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!cfs_request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 cfs_try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
151 EXPORT_SYMBOL(class_get_type);
153 void class_put_type(struct obd_type *type)
156 spin_lock(&type->obd_type_lock);
158 cfs_module_put(type->typ_dt_ops->o_owner);
159 spin_unlock(&type->obd_type_lock);
161 EXPORT_SYMBOL(class_put_type);
163 #define CLASS_MAX_NAME 1024
165 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
166 struct lprocfs_vars *vars, const char *name,
167 struct lu_device_type *ldt)
169 struct obd_type *type;
174 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
176 if (class_search_type(name)) {
177 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
182 OBD_ALLOC(type, sizeof(*type));
186 OBD_ALLOC_PTR(type->typ_dt_ops);
187 OBD_ALLOC_PTR(type->typ_md_ops);
188 OBD_ALLOC(type->typ_name, strlen(name) + 1);
190 if (type->typ_dt_ops == NULL ||
191 type->typ_md_ops == NULL ||
192 type->typ_name == NULL)
195 *(type->typ_dt_ops) = *dt_ops;
196 /* md_ops is optional */
198 *(type->typ_md_ops) = *md_ops;
199 strcpy(type->typ_name, name);
200 spin_lock_init(&type->obd_type_lock);
203 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
205 if (IS_ERR(type->typ_procroot)) {
206 rc = PTR_ERR(type->typ_procroot);
207 type->typ_procroot = NULL;
213 rc = lu_device_type_init(ldt);
218 spin_lock(&obd_types_lock);
219 cfs_list_add(&type->typ_chain, &obd_types);
220 spin_unlock(&obd_types_lock);
225 if (type->typ_name != NULL)
226 OBD_FREE(type->typ_name, strlen(name) + 1);
227 if (type->typ_md_ops != NULL)
228 OBD_FREE_PTR(type->typ_md_ops);
229 if (type->typ_dt_ops != NULL)
230 OBD_FREE_PTR(type->typ_dt_ops);
231 OBD_FREE(type, sizeof(*type));
234 EXPORT_SYMBOL(class_register_type);
236 int class_unregister_type(const char *name)
238 struct obd_type *type = class_search_type(name);
242 CERROR("unknown obd type\n");
246 if (type->typ_refcnt) {
247 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
248 /* This is a bad situation, let's make the best of it */
249 /* Remove ops, but leave the name for debugging */
250 OBD_FREE_PTR(type->typ_dt_ops);
251 OBD_FREE_PTR(type->typ_md_ops);
255 /* we do not use type->typ_procroot as for compatibility purposes
256 * other modules can share names (i.e. lod can use lov entry). so
257 * we can't reference pointer as it can get invalided when another
258 * module removes the entry */
259 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
262 lu_device_type_fini(type->typ_lu);
264 spin_lock(&obd_types_lock);
265 cfs_list_del(&type->typ_chain);
266 spin_unlock(&obd_types_lock);
267 OBD_FREE(type->typ_name, strlen(name) + 1);
268 if (type->typ_dt_ops != NULL)
269 OBD_FREE_PTR(type->typ_dt_ops);
270 if (type->typ_md_ops != NULL)
271 OBD_FREE_PTR(type->typ_md_ops);
272 OBD_FREE(type, sizeof(*type));
274 } /* class_unregister_type */
275 EXPORT_SYMBOL(class_unregister_type);
278 * Create a new obd device.
280 * Find an empty slot in ::obd_devs[], create a new obd device in it.
282 * \param[in] type_name obd device type string.
283 * \param[in] name obd device name.
285 * \retval NULL if create fails, otherwise return the obd device
288 struct obd_device *class_newdev(const char *type_name, const char *name)
290 struct obd_device *result = NULL;
291 struct obd_device *newdev;
292 struct obd_type *type = NULL;
294 int new_obd_minor = 0;
297 if (strlen(name) >= MAX_OBD_NAME) {
298 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
299 RETURN(ERR_PTR(-EINVAL));
302 type = class_get_type(type_name);
304 CERROR("OBD: unknown type: %s\n", type_name);
305 RETURN(ERR_PTR(-ENODEV));
308 newdev = obd_device_alloc();
310 GOTO(out_type, result = ERR_PTR(-ENOMEM));
312 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
314 write_lock(&obd_dev_lock);
315 for (i = 0; i < class_devno_max(); i++) {
316 struct obd_device *obd = class_num2obd(i);
318 if (obd && obd->obd_name &&
319 (strcmp(name, obd->obd_name) == 0)) {
320 CERROR("Device %s already exists at %d, won't add\n",
323 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
324 "%p obd_magic %08x != %08x\n", result,
325 result->obd_magic, OBD_DEVICE_MAGIC);
326 LASSERTF(result->obd_minor == new_obd_minor,
327 "%p obd_minor %d != %d\n", result,
328 result->obd_minor, new_obd_minor);
330 obd_devs[result->obd_minor] = NULL;
331 result->obd_name[0]='\0';
333 result = ERR_PTR(-EEXIST);
336 if (!result && !obd) {
338 result->obd_minor = i;
340 result->obd_type = type;
341 strncpy(result->obd_name, name,
342 sizeof(result->obd_name) - 1);
343 obd_devs[i] = result;
346 write_unlock(&obd_dev_lock);
348 if (result == NULL && i >= class_devno_max()) {
349 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
351 GOTO(out, result = ERR_PTR(-EOVERFLOW));
357 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
358 result->obd_name, result);
362 obd_device_free(newdev);
364 class_put_type(type);
368 void class_release_dev(struct obd_device *obd)
370 struct obd_type *obd_type = obd->obd_type;
372 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
373 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
374 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
375 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
376 LASSERT(obd_type != NULL);
378 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
379 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
381 write_lock(&obd_dev_lock);
382 obd_devs[obd->obd_minor] = NULL;
383 write_unlock(&obd_dev_lock);
384 obd_device_free(obd);
386 class_put_type(obd_type);
389 int class_name2dev(const char *name)
396 read_lock(&obd_dev_lock);
397 for (i = 0; i < class_devno_max(); i++) {
398 struct obd_device *obd = class_num2obd(i);
400 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
401 /* Make sure we finished attaching before we give
402 out any references */
403 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
404 if (obd->obd_attached) {
405 read_unlock(&obd_dev_lock);
411 read_unlock(&obd_dev_lock);
415 EXPORT_SYMBOL(class_name2dev);
417 struct obd_device *class_name2obd(const char *name)
419 int dev = class_name2dev(name);
421 if (dev < 0 || dev > class_devno_max())
423 return class_num2obd(dev);
425 EXPORT_SYMBOL(class_name2obd);
427 int class_uuid2dev(struct obd_uuid *uuid)
431 read_lock(&obd_dev_lock);
432 for (i = 0; i < class_devno_max(); i++) {
433 struct obd_device *obd = class_num2obd(i);
435 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
436 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
437 read_unlock(&obd_dev_lock);
441 read_unlock(&obd_dev_lock);
445 EXPORT_SYMBOL(class_uuid2dev);
447 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
449 int dev = class_uuid2dev(uuid);
452 return class_num2obd(dev);
454 EXPORT_SYMBOL(class_uuid2obd);
457 * Get obd device from ::obd_devs[]
459 * \param num [in] array index
461 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
462 * otherwise return the obd device there.
464 struct obd_device *class_num2obd(int num)
466 struct obd_device *obd = NULL;
468 if (num < class_devno_max()) {
473 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
474 "%p obd_magic %08x != %08x\n",
475 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
476 LASSERTF(obd->obd_minor == num,
477 "%p obd_minor %0d != %0d\n",
478 obd, obd->obd_minor, num);
483 EXPORT_SYMBOL(class_num2obd);
486 * Get obd devices count. Device in any
488 * \retval obd device count
490 int get_devices_count(void)
492 int index, max_index = class_devno_max(), dev_count = 0;
494 read_lock(&obd_dev_lock);
495 for (index = 0; index <= max_index; index++) {
496 struct obd_device *obd = class_num2obd(index);
500 read_unlock(&obd_dev_lock);
504 EXPORT_SYMBOL(get_devices_count);
506 void class_obd_list(void)
511 read_lock(&obd_dev_lock);
512 for (i = 0; i < class_devno_max(); i++) {
513 struct obd_device *obd = class_num2obd(i);
517 if (obd->obd_stopping)
519 else if (obd->obd_set_up)
521 else if (obd->obd_attached)
525 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
526 i, status, obd->obd_type->typ_name,
527 obd->obd_name, obd->obd_uuid.uuid,
528 cfs_atomic_read(&obd->obd_refcount));
530 read_unlock(&obd_dev_lock);
534 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
535 specified, then only the client with that uuid is returned,
536 otherwise any client connected to the tgt is returned. */
537 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
538 const char * typ_name,
539 struct obd_uuid *grp_uuid)
543 read_lock(&obd_dev_lock);
544 for (i = 0; i < class_devno_max(); i++) {
545 struct obd_device *obd = class_num2obd(i);
549 if ((strncmp(obd->obd_type->typ_name, typ_name,
550 strlen(typ_name)) == 0)) {
551 if (obd_uuid_equals(tgt_uuid,
552 &obd->u.cli.cl_target_uuid) &&
553 ((grp_uuid)? obd_uuid_equals(grp_uuid,
554 &obd->obd_uuid) : 1)) {
555 read_unlock(&obd_dev_lock);
560 read_unlock(&obd_dev_lock);
564 EXPORT_SYMBOL(class_find_client_obd);
566 /* Iterate the obd_device list looking devices have grp_uuid. Start
567 searching at *next, and if a device is found, the next index to look
568 at is saved in *next. If next is NULL, then the first matching device
569 will always be returned. */
570 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
576 else if (*next >= 0 && *next < class_devno_max())
581 read_lock(&obd_dev_lock);
582 for (; i < class_devno_max(); i++) {
583 struct obd_device *obd = class_num2obd(i);
587 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
590 read_unlock(&obd_dev_lock);
594 read_unlock(&obd_dev_lock);
598 EXPORT_SYMBOL(class_devices_in_group);
601 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
602 * adjust sptlrpc settings accordingly.
604 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
606 struct obd_device *obd;
610 LASSERT(namelen > 0);
612 read_lock(&obd_dev_lock);
613 for (i = 0; i < class_devno_max(); i++) {
614 obd = class_num2obd(i);
616 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
619 /* only notify mdc, osc, mdt, ost */
620 type = obd->obd_type->typ_name;
621 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
622 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
623 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
624 strcmp(type, LUSTRE_OST_NAME) != 0)
627 if (strncmp(obd->obd_name, fsname, namelen))
630 class_incref(obd, __FUNCTION__, obd);
631 read_unlock(&obd_dev_lock);
632 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
633 sizeof(KEY_SPTLRPC_CONF),
634 KEY_SPTLRPC_CONF, 0, NULL, NULL);
636 class_decref(obd, __FUNCTION__, obd);
637 read_lock(&obd_dev_lock);
639 read_unlock(&obd_dev_lock);
642 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
644 void obd_cleanup_caches(void)
649 if (obd_device_cachep) {
650 rc = cfs_mem_cache_destroy(obd_device_cachep);
651 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
652 obd_device_cachep = NULL;
655 rc = cfs_mem_cache_destroy(obdo_cachep);
656 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
660 rc = cfs_mem_cache_destroy(import_cachep);
661 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
662 import_cachep = NULL;
665 rc = cfs_mem_cache_destroy(capa_cachep);
666 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
672 int obd_init_caches(void)
676 LASSERT(obd_device_cachep == NULL);
677 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
678 sizeof(struct obd_device),
680 if (!obd_device_cachep)
683 LASSERT(obdo_cachep == NULL);
684 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
689 LASSERT(import_cachep == NULL);
690 import_cachep = cfs_mem_cache_create("ll_import_cache",
691 sizeof(struct obd_import),
696 LASSERT(capa_cachep == NULL);
697 capa_cachep = cfs_mem_cache_create("capa_cache",
698 sizeof(struct obd_capa), 0, 0);
704 obd_cleanup_caches();
709 /* map connection to client */
710 struct obd_export *class_conn2export(struct lustre_handle *conn)
712 struct obd_export *export;
716 CDEBUG(D_CACHE, "looking for null handle\n");
720 if (conn->cookie == -1) { /* this means assign a new connection */
721 CDEBUG(D_CACHE, "want a new connection\n");
725 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
726 export = class_handle2object(conn->cookie);
729 EXPORT_SYMBOL(class_conn2export);
731 struct obd_device *class_exp2obd(struct obd_export *exp)
737 EXPORT_SYMBOL(class_exp2obd);
739 struct obd_device *class_conn2obd(struct lustre_handle *conn)
741 struct obd_export *export;
742 export = class_conn2export(conn);
744 struct obd_device *obd = export->exp_obd;
745 class_export_put(export);
750 EXPORT_SYMBOL(class_conn2obd);
752 struct obd_import *class_exp2cliimp(struct obd_export *exp)
754 struct obd_device *obd = exp->exp_obd;
757 return obd->u.cli.cl_import;
759 EXPORT_SYMBOL(class_exp2cliimp);
761 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
763 struct obd_device *obd = class_conn2obd(conn);
766 return obd->u.cli.cl_import;
768 EXPORT_SYMBOL(class_conn2cliimp);
770 /* Export management functions */
771 static void class_export_destroy(struct obd_export *exp)
773 struct obd_device *obd = exp->exp_obd;
776 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
777 LASSERT(obd != NULL);
779 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
780 exp->exp_client_uuid.uuid, obd->obd_name);
782 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
783 if (exp->exp_connection)
784 ptlrpc_put_connection_superhack(exp->exp_connection);
786 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
787 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
788 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
789 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
790 obd_destroy_export(exp);
791 class_decref(obd, "export", exp);
793 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
797 static void export_handle_addref(void *export)
799 class_export_get(export);
802 static struct portals_handle_ops export_handle_ops = {
803 .hop_addref = export_handle_addref,
807 struct obd_export *class_export_get(struct obd_export *exp)
809 cfs_atomic_inc(&exp->exp_refcount);
810 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
811 cfs_atomic_read(&exp->exp_refcount));
814 EXPORT_SYMBOL(class_export_get);
816 void class_export_put(struct obd_export *exp)
818 LASSERT(exp != NULL);
819 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
820 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
821 cfs_atomic_read(&exp->exp_refcount) - 1);
823 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
824 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
825 CDEBUG(D_IOCTL, "final put %p/%s\n",
826 exp, exp->exp_client_uuid.uuid);
828 /* release nid stat refererence */
829 lprocfs_exp_cleanup(exp);
831 obd_zombie_export_add(exp);
834 EXPORT_SYMBOL(class_export_put);
836 /* Creates a new export, adds it to the hash table, and returns a
837 * pointer to it. The refcount is 2: one for the hash reference, and
838 * one for the pointer returned by this function. */
839 struct obd_export *class_new_export(struct obd_device *obd,
840 struct obd_uuid *cluuid)
842 struct obd_export *export;
843 cfs_hash_t *hash = NULL;
847 OBD_ALLOC_PTR(export);
849 return ERR_PTR(-ENOMEM);
851 export->exp_conn_cnt = 0;
852 export->exp_lock_hash = NULL;
853 export->exp_flock_hash = NULL;
854 cfs_atomic_set(&export->exp_refcount, 2);
855 cfs_atomic_set(&export->exp_rpc_count, 0);
856 cfs_atomic_set(&export->exp_cb_count, 0);
857 cfs_atomic_set(&export->exp_locks_count, 0);
858 #if LUSTRE_TRACKS_LOCK_EXP_REFS
859 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
860 spin_lock_init(&export->exp_locks_list_guard);
862 cfs_atomic_set(&export->exp_replay_count, 0);
863 export->exp_obd = obd;
864 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
865 spin_lock_init(&export->exp_uncommitted_replies_lock);
866 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
867 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
868 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
869 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
870 class_handle_hash(&export->exp_handle, &export_handle_ops);
871 export->exp_last_request_time = cfs_time_current_sec();
872 spin_lock_init(&export->exp_lock);
873 spin_lock_init(&export->exp_rpc_lock);
874 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
875 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
876 spin_lock_init(&export->exp_bl_list_lock);
877 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
879 export->exp_sp_peer = LUSTRE_SP_ANY;
880 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
881 export->exp_client_uuid = *cluuid;
882 obd_init_export(export);
884 spin_lock(&obd->obd_dev_lock);
885 /* shouldn't happen, but might race */
886 if (obd->obd_stopping)
887 GOTO(exit_unlock, rc = -ENODEV);
889 hash = cfs_hash_getref(obd->obd_uuid_hash);
891 GOTO(exit_unlock, rc = -ENODEV);
892 spin_unlock(&obd->obd_dev_lock);
894 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
895 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
897 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
898 obd->obd_name, cluuid->uuid, rc);
899 GOTO(exit_err, rc = -EALREADY);
903 spin_lock(&obd->obd_dev_lock);
904 if (obd->obd_stopping) {
905 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
906 GOTO(exit_unlock, rc = -ENODEV);
909 class_incref(obd, "export", export);
910 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
911 cfs_list_add_tail(&export->exp_obd_chain_timed,
912 &export->exp_obd->obd_exports_timed);
913 export->exp_obd->obd_num_exports++;
914 spin_unlock(&obd->obd_dev_lock);
915 cfs_hash_putref(hash);
919 spin_unlock(&obd->obd_dev_lock);
922 cfs_hash_putref(hash);
923 class_handle_unhash(&export->exp_handle);
924 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
925 obd_destroy_export(export);
926 OBD_FREE_PTR(export);
929 EXPORT_SYMBOL(class_new_export);
931 void class_unlink_export(struct obd_export *exp)
933 class_handle_unhash(&exp->exp_handle);
935 spin_lock(&exp->exp_obd->obd_dev_lock);
936 /* delete an uuid-export hashitem from hashtables */
937 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
938 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
939 &exp->exp_client_uuid,
940 &exp->exp_uuid_hash);
942 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
943 cfs_list_del_init(&exp->exp_obd_chain_timed);
944 exp->exp_obd->obd_num_exports--;
945 spin_unlock(&exp->exp_obd->obd_dev_lock);
946 class_export_put(exp);
948 EXPORT_SYMBOL(class_unlink_export);
950 /* Import management functions */
951 void class_import_destroy(struct obd_import *imp)
955 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
956 imp->imp_obd->obd_name);
958 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
960 ptlrpc_put_connection_superhack(imp->imp_connection);
962 while (!cfs_list_empty(&imp->imp_conn_list)) {
963 struct obd_import_conn *imp_conn;
965 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
966 struct obd_import_conn, oic_item);
967 cfs_list_del_init(&imp_conn->oic_item);
968 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
969 OBD_FREE(imp_conn, sizeof(*imp_conn));
972 LASSERT(imp->imp_sec == NULL);
973 class_decref(imp->imp_obd, "import", imp);
974 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
978 static void import_handle_addref(void *import)
980 class_import_get(import);
983 static struct portals_handle_ops import_handle_ops = {
984 .hop_addref = import_handle_addref,
988 struct obd_import *class_import_get(struct obd_import *import)
990 cfs_atomic_inc(&import->imp_refcount);
991 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
992 cfs_atomic_read(&import->imp_refcount),
993 import->imp_obd->obd_name);
996 EXPORT_SYMBOL(class_import_get);
998 void class_import_put(struct obd_import *imp)
1002 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1003 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1005 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1006 cfs_atomic_read(&imp->imp_refcount) - 1,
1007 imp->imp_obd->obd_name);
1009 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1010 CDEBUG(D_INFO, "final put import %p\n", imp);
1011 obd_zombie_import_add(imp);
1014 /* catch possible import put race */
1015 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1018 EXPORT_SYMBOL(class_import_put);
1020 static void init_imp_at(struct imp_at *at) {
1022 at_init(&at->iat_net_latency, 0, 0);
1023 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1024 /* max service estimates are tracked on the server side, so
1025 don't use the AT history here, just use the last reported
1026 val. (But keep hist for proc histogram, worst_ever) */
1027 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1032 struct obd_import *class_new_import(struct obd_device *obd)
1034 struct obd_import *imp;
1036 OBD_ALLOC(imp, sizeof(*imp));
1040 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1041 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1042 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1043 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1044 spin_lock_init(&imp->imp_lock);
1045 imp->imp_last_success_conn = 0;
1046 imp->imp_state = LUSTRE_IMP_NEW;
1047 imp->imp_obd = class_incref(obd, "import", imp);
1048 mutex_init(&imp->imp_sec_mutex);
1049 cfs_waitq_init(&imp->imp_recovery_waitq);
1051 cfs_atomic_set(&imp->imp_refcount, 2);
1052 cfs_atomic_set(&imp->imp_unregistering, 0);
1053 cfs_atomic_set(&imp->imp_inflight, 0);
1054 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1055 cfs_atomic_set(&imp->imp_inval_count, 0);
1056 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1057 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1058 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1059 init_imp_at(&imp->imp_at);
1061 /* the default magic is V2, will be used in connect RPC, and
1062 * then adjusted according to the flags in request/reply. */
1063 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1067 EXPORT_SYMBOL(class_new_import);
1069 void class_destroy_import(struct obd_import *import)
1071 LASSERT(import != NULL);
1072 LASSERT(import != LP_POISON);
1074 class_handle_unhash(&import->imp_handle);
1076 spin_lock(&import->imp_lock);
1077 import->imp_generation++;
1078 spin_unlock(&import->imp_lock);
1079 class_import_put(import);
1081 EXPORT_SYMBOL(class_destroy_import);
1083 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1085 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1087 spin_lock(&exp->exp_locks_list_guard);
1089 LASSERT(lock->l_exp_refs_nr >= 0);
1091 if (lock->l_exp_refs_target != NULL &&
1092 lock->l_exp_refs_target != exp) {
1093 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1094 exp, lock, lock->l_exp_refs_target);
1096 if ((lock->l_exp_refs_nr ++) == 0) {
1097 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1098 lock->l_exp_refs_target = exp;
1100 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1101 lock, exp, lock->l_exp_refs_nr);
1102 spin_unlock(&exp->exp_locks_list_guard);
1104 EXPORT_SYMBOL(__class_export_add_lock_ref);
1106 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1108 spin_lock(&exp->exp_locks_list_guard);
1109 LASSERT(lock->l_exp_refs_nr > 0);
1110 if (lock->l_exp_refs_target != exp) {
1111 LCONSOLE_WARN("lock %p, "
1112 "mismatching export pointers: %p, %p\n",
1113 lock, lock->l_exp_refs_target, exp);
1115 if (-- lock->l_exp_refs_nr == 0) {
1116 cfs_list_del_init(&lock->l_exp_refs_link);
1117 lock->l_exp_refs_target = NULL;
1119 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1120 lock, exp, lock->l_exp_refs_nr);
1121 spin_unlock(&exp->exp_locks_list_guard);
1123 EXPORT_SYMBOL(__class_export_del_lock_ref);
1126 /* A connection defines an export context in which preallocation can
1127 be managed. This releases the export pointer reference, and returns
1128 the export handle, so the export refcount is 1 when this function
1130 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1131 struct obd_uuid *cluuid)
1133 struct obd_export *export;
1134 LASSERT(conn != NULL);
1135 LASSERT(obd != NULL);
1136 LASSERT(cluuid != NULL);
1139 export = class_new_export(obd, cluuid);
1141 RETURN(PTR_ERR(export));
1143 conn->cookie = export->exp_handle.h_cookie;
1144 class_export_put(export);
1146 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1147 cluuid->uuid, conn->cookie);
1150 EXPORT_SYMBOL(class_connect);
1152 /* if export is involved in recovery then clean up related things */
1153 void class_export_recovery_cleanup(struct obd_export *exp)
1155 struct obd_device *obd = exp->exp_obd;
1157 spin_lock(&obd->obd_recovery_task_lock);
1158 if (exp->exp_delayed)
1159 obd->obd_delayed_clients--;
1160 if (obd->obd_recovering) {
1161 if (exp->exp_in_recovery) {
1162 spin_lock(&exp->exp_lock);
1163 exp->exp_in_recovery = 0;
1164 spin_unlock(&exp->exp_lock);
1165 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1166 cfs_atomic_dec(&obd->obd_connected_clients);
1169 /* if called during recovery then should update
1170 * obd_stale_clients counter,
1171 * lightweight exports are not counted */
1172 if (exp->exp_failed &&
1173 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1174 exp->exp_obd->obd_stale_clients++;
1176 spin_unlock(&obd->obd_recovery_task_lock);
1177 /** Cleanup req replay fields */
1178 if (exp->exp_req_replay_needed) {
1179 spin_lock(&exp->exp_lock);
1180 exp->exp_req_replay_needed = 0;
1181 spin_unlock(&exp->exp_lock);
1182 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1183 cfs_atomic_dec(&obd->obd_req_replay_clients);
1185 /** Cleanup lock replay data */
1186 if (exp->exp_lock_replay_needed) {
1187 spin_lock(&exp->exp_lock);
1188 exp->exp_lock_replay_needed = 0;
1189 spin_unlock(&exp->exp_lock);
1190 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1191 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1195 /* This function removes 1-3 references from the export:
1196 * 1 - for export pointer passed
1197 * and if disconnect really need
1198 * 2 - removing from hash
1199 * 3 - in client_unlink_export
1200 * The export pointer passed to this function can destroyed */
1201 int class_disconnect(struct obd_export *export)
1203 int already_disconnected;
1206 if (export == NULL) {
1207 CWARN("attempting to free NULL export %p\n", export);
1211 spin_lock(&export->exp_lock);
1212 already_disconnected = export->exp_disconnected;
1213 export->exp_disconnected = 1;
1214 spin_unlock(&export->exp_lock);
1216 /* class_cleanup(), abort_recovery(), and class_fail_export()
1217 * all end up in here, and if any of them race we shouldn't
1218 * call extra class_export_puts(). */
1219 if (already_disconnected) {
1220 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1221 GOTO(no_disconn, already_disconnected);
1224 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1225 export->exp_handle.h_cookie);
1227 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1228 cfs_hash_del(export->exp_obd->obd_nid_hash,
1229 &export->exp_connection->c_peer.nid,
1230 &export->exp_nid_hash);
1232 class_export_recovery_cleanup(export);
1233 class_unlink_export(export);
1235 class_export_put(export);
1238 EXPORT_SYMBOL(class_disconnect);
1240 /* Return non-zero for a fully connected export */
1241 int class_connected_export(struct obd_export *exp)
1245 spin_lock(&exp->exp_lock);
1246 connected = (exp->exp_conn_cnt > 0);
1247 spin_unlock(&exp->exp_lock);
1252 EXPORT_SYMBOL(class_connected_export);
1254 static void class_disconnect_export_list(cfs_list_t *list,
1255 enum obd_option flags)
1258 struct obd_export *exp;
1261 /* It's possible that an export may disconnect itself, but
1262 * nothing else will be added to this list. */
1263 while (!cfs_list_empty(list)) {
1264 exp = cfs_list_entry(list->next, struct obd_export,
1266 /* need for safe call CDEBUG after obd_disconnect */
1267 class_export_get(exp);
1269 spin_lock(&exp->exp_lock);
1270 exp->exp_flags = flags;
1271 spin_unlock(&exp->exp_lock);
1273 if (obd_uuid_equals(&exp->exp_client_uuid,
1274 &exp->exp_obd->obd_uuid)) {
1276 "exp %p export uuid == obd uuid, don't discon\n",
1278 /* Need to delete this now so we don't end up pointing
1279 * to work_list later when this export is cleaned up. */
1280 cfs_list_del_init(&exp->exp_obd_chain);
1281 class_export_put(exp);
1285 class_export_get(exp);
1286 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1287 "last request at "CFS_TIME_T"\n",
1288 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1289 exp, exp->exp_last_request_time);
1290 /* release one export reference anyway */
1291 rc = obd_disconnect(exp);
1293 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1294 obd_export_nid2str(exp), exp, rc);
1295 class_export_put(exp);
1300 void class_disconnect_exports(struct obd_device *obd)
1302 cfs_list_t work_list;
1305 /* Move all of the exports from obd_exports to a work list, en masse. */
1306 CFS_INIT_LIST_HEAD(&work_list);
1307 spin_lock(&obd->obd_dev_lock);
1308 cfs_list_splice_init(&obd->obd_exports, &work_list);
1309 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1310 spin_unlock(&obd->obd_dev_lock);
1312 if (!cfs_list_empty(&work_list)) {
1313 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1314 "disconnecting them\n", obd->obd_minor, obd);
1315 class_disconnect_export_list(&work_list,
1316 exp_flags_from_obd(obd));
1318 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1319 obd->obd_minor, obd);
1322 EXPORT_SYMBOL(class_disconnect_exports);
1324 /* Remove exports that have not completed recovery.
1326 void class_disconnect_stale_exports(struct obd_device *obd,
1327 int (*test_export)(struct obd_export *))
1329 cfs_list_t work_list;
1330 struct obd_export *exp, *n;
1334 CFS_INIT_LIST_HEAD(&work_list);
1335 spin_lock(&obd->obd_dev_lock);
1336 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1338 /* don't count self-export as client */
1339 if (obd_uuid_equals(&exp->exp_client_uuid,
1340 &exp->exp_obd->obd_uuid))
1343 /* don't evict clients which have no slot in last_rcvd
1344 * (e.g. lightweight connection) */
1345 if (exp->exp_target_data.ted_lr_idx == -1)
1348 spin_lock(&exp->exp_lock);
1349 if (exp->exp_failed || test_export(exp)) {
1350 spin_unlock(&exp->exp_lock);
1353 exp->exp_failed = 1;
1354 spin_unlock(&exp->exp_lock);
1356 cfs_list_move(&exp->exp_obd_chain, &work_list);
1358 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1359 obd->obd_name, exp->exp_client_uuid.uuid,
1360 exp->exp_connection == NULL ? "<unknown>" :
1361 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1362 print_export_data(exp, "EVICTING", 0);
1364 spin_unlock(&obd->obd_dev_lock);
1367 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1368 obd->obd_name, evicted);
1370 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1371 OBD_OPT_ABORT_RECOV);
1374 EXPORT_SYMBOL(class_disconnect_stale_exports);
1376 void class_fail_export(struct obd_export *exp)
1378 int rc, already_failed;
1380 spin_lock(&exp->exp_lock);
1381 already_failed = exp->exp_failed;
1382 exp->exp_failed = 1;
1383 spin_unlock(&exp->exp_lock);
1385 if (already_failed) {
1386 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1387 exp, exp->exp_client_uuid.uuid);
1391 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1392 exp, exp->exp_client_uuid.uuid);
1394 if (obd_dump_on_timeout)
1395 libcfs_debug_dumplog();
1397 /* need for safe call CDEBUG after obd_disconnect */
1398 class_export_get(exp);
1400 /* Most callers into obd_disconnect are removing their own reference
1401 * (request, for example) in addition to the one from the hash table.
1402 * We don't have such a reference here, so make one. */
1403 class_export_get(exp);
1404 rc = obd_disconnect(exp);
1406 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1408 CDEBUG(D_HA, "disconnected export %p/%s\n",
1409 exp, exp->exp_client_uuid.uuid);
1410 class_export_put(exp);
1412 EXPORT_SYMBOL(class_fail_export);
1414 char *obd_export_nid2str(struct obd_export *exp)
1416 if (exp->exp_connection != NULL)
1417 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1421 EXPORT_SYMBOL(obd_export_nid2str);
1423 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1425 cfs_hash_t *nid_hash;
1426 struct obd_export *doomed_exp = NULL;
1427 int exports_evicted = 0;
1429 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1431 spin_lock(&obd->obd_dev_lock);
1432 /* umount has run already, so evict thread should leave
1433 * its task to umount thread now */
1434 if (obd->obd_stopping) {
1435 spin_unlock(&obd->obd_dev_lock);
1436 return exports_evicted;
1438 nid_hash = obd->obd_nid_hash;
1439 cfs_hash_getref(nid_hash);
1440 spin_unlock(&obd->obd_dev_lock);
1443 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1444 if (doomed_exp == NULL)
1447 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1448 "nid %s found, wanted nid %s, requested nid %s\n",
1449 obd_export_nid2str(doomed_exp),
1450 libcfs_nid2str(nid_key), nid);
1451 LASSERTF(doomed_exp != obd->obd_self_export,
1452 "self-export is hashed by NID?\n");
1454 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1455 "request\n", obd->obd_name,
1456 obd_uuid2str(&doomed_exp->exp_client_uuid),
1457 obd_export_nid2str(doomed_exp));
1458 class_fail_export(doomed_exp);
1459 class_export_put(doomed_exp);
1462 cfs_hash_putref(nid_hash);
1464 if (!exports_evicted)
1465 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1466 obd->obd_name, nid);
1467 return exports_evicted;
1469 EXPORT_SYMBOL(obd_export_evict_by_nid);
1471 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1473 cfs_hash_t *uuid_hash;
1474 struct obd_export *doomed_exp = NULL;
1475 struct obd_uuid doomed_uuid;
1476 int exports_evicted = 0;
1478 spin_lock(&obd->obd_dev_lock);
1479 if (obd->obd_stopping) {
1480 spin_unlock(&obd->obd_dev_lock);
1481 return exports_evicted;
1483 uuid_hash = obd->obd_uuid_hash;
1484 cfs_hash_getref(uuid_hash);
1485 spin_unlock(&obd->obd_dev_lock);
1487 obd_str2uuid(&doomed_uuid, uuid);
1488 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1489 CERROR("%s: can't evict myself\n", obd->obd_name);
1490 cfs_hash_putref(uuid_hash);
1491 return exports_evicted;
1494 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1496 if (doomed_exp == NULL) {
1497 CERROR("%s: can't disconnect %s: no exports found\n",
1498 obd->obd_name, uuid);
1500 CWARN("%s: evicting %s at adminstrative request\n",
1501 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1502 class_fail_export(doomed_exp);
1503 class_export_put(doomed_exp);
1506 cfs_hash_putref(uuid_hash);
1508 return exports_evicted;
1510 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1512 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1513 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1514 EXPORT_SYMBOL(class_export_dump_hook);
1517 static void print_export_data(struct obd_export *exp, const char *status,
1520 struct ptlrpc_reply_state *rs;
1521 struct ptlrpc_reply_state *first_reply = NULL;
1524 spin_lock(&exp->exp_lock);
1525 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1531 spin_unlock(&exp->exp_lock);
1533 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1534 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1535 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1536 cfs_atomic_read(&exp->exp_rpc_count),
1537 cfs_atomic_read(&exp->exp_cb_count),
1538 cfs_atomic_read(&exp->exp_locks_count),
1539 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1540 nreplies, first_reply, nreplies > 3 ? "..." : "",
1541 exp->exp_last_committed);
1542 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1543 if (locks && class_export_dump_hook != NULL)
1544 class_export_dump_hook(exp);
1548 void dump_exports(struct obd_device *obd, int locks)
1550 struct obd_export *exp;
1552 spin_lock(&obd->obd_dev_lock);
1553 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1554 print_export_data(exp, "ACTIVE", locks);
1555 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1556 print_export_data(exp, "UNLINKED", locks);
1557 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1558 print_export_data(exp, "DELAYED", locks);
1559 spin_unlock(&obd->obd_dev_lock);
1560 spin_lock(&obd_zombie_impexp_lock);
1561 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1562 print_export_data(exp, "ZOMBIE", locks);
1563 spin_unlock(&obd_zombie_impexp_lock);
1565 EXPORT_SYMBOL(dump_exports);
1567 void obd_exports_barrier(struct obd_device *obd)
1570 LASSERT(cfs_list_empty(&obd->obd_exports));
1571 spin_lock(&obd->obd_dev_lock);
1572 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1573 spin_unlock(&obd->obd_dev_lock);
1574 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1575 cfs_time_seconds(waited));
1576 if (waited > 5 && IS_PO2(waited)) {
1577 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1578 "more than %d seconds. "
1579 "The obd refcount = %d. Is it stuck?\n",
1580 obd->obd_name, waited,
1581 cfs_atomic_read(&obd->obd_refcount));
1582 dump_exports(obd, 1);
1585 spin_lock(&obd->obd_dev_lock);
1587 spin_unlock(&obd->obd_dev_lock);
1589 EXPORT_SYMBOL(obd_exports_barrier);
1591 /* Total amount of zombies to be destroyed */
1592 static int zombies_count = 0;
1595 * kill zombie imports and exports
1597 void obd_zombie_impexp_cull(void)
1599 struct obd_import *import;
1600 struct obd_export *export;
1604 spin_lock(&obd_zombie_impexp_lock);
1607 if (!cfs_list_empty(&obd_zombie_imports)) {
1608 import = cfs_list_entry(obd_zombie_imports.next,
1611 cfs_list_del_init(&import->imp_zombie_chain);
1615 if (!cfs_list_empty(&obd_zombie_exports)) {
1616 export = cfs_list_entry(obd_zombie_exports.next,
1619 cfs_list_del_init(&export->exp_obd_chain);
1622 spin_unlock(&obd_zombie_impexp_lock);
1624 if (import != NULL) {
1625 class_import_destroy(import);
1626 spin_lock(&obd_zombie_impexp_lock);
1628 spin_unlock(&obd_zombie_impexp_lock);
1631 if (export != NULL) {
1632 class_export_destroy(export);
1633 spin_lock(&obd_zombie_impexp_lock);
1635 spin_unlock(&obd_zombie_impexp_lock);
1639 } while (import != NULL || export != NULL);
1643 static struct completion obd_zombie_start;
1644 static struct completion obd_zombie_stop;
1645 static unsigned long obd_zombie_flags;
1646 static cfs_waitq_t obd_zombie_waitq;
1647 static pid_t obd_zombie_pid;
1650 OBD_ZOMBIE_STOP = 0x0001,
1654 * check for work for kill zombie import/export thread.
1656 static int obd_zombie_impexp_check(void *arg)
1660 spin_lock(&obd_zombie_impexp_lock);
1661 rc = (zombies_count == 0) &&
1662 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1663 spin_unlock(&obd_zombie_impexp_lock);
1669 * Add export to the obd_zombe thread and notify it.
1671 static void obd_zombie_export_add(struct obd_export *exp) {
1672 spin_lock(&exp->exp_obd->obd_dev_lock);
1673 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1674 cfs_list_del_init(&exp->exp_obd_chain);
1675 spin_unlock(&exp->exp_obd->obd_dev_lock);
1676 spin_lock(&obd_zombie_impexp_lock);
1678 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1679 spin_unlock(&obd_zombie_impexp_lock);
1681 obd_zombie_impexp_notify();
1685 * Add import to the obd_zombe thread and notify it.
1687 static void obd_zombie_import_add(struct obd_import *imp) {
1688 LASSERT(imp->imp_sec == NULL);
1689 LASSERT(imp->imp_rq_pool == NULL);
1690 spin_lock(&obd_zombie_impexp_lock);
1691 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1693 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1694 spin_unlock(&obd_zombie_impexp_lock);
1696 obd_zombie_impexp_notify();
1700 * notify import/export destroy thread about new zombie.
1702 static void obd_zombie_impexp_notify(void)
1705 * Make sure obd_zomebie_impexp_thread get this notification.
1706 * It is possible this signal only get by obd_zombie_barrier, and
1707 * barrier gulps this notification and sleeps away and hangs ensues
1709 cfs_waitq_broadcast(&obd_zombie_waitq);
1713 * check whether obd_zombie is idle
1715 static int obd_zombie_is_idle(void)
1719 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1720 spin_lock(&obd_zombie_impexp_lock);
1721 rc = (zombies_count == 0);
1722 spin_unlock(&obd_zombie_impexp_lock);
1727 * wait when obd_zombie import/export queues become empty
1729 void obd_zombie_barrier(void)
1731 struct l_wait_info lwi = { 0 };
1733 if (obd_zombie_pid == cfs_curproc_pid())
1734 /* don't wait for myself */
1736 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1738 EXPORT_SYMBOL(obd_zombie_barrier);
1743 * destroy zombie export/import thread.
1745 static int obd_zombie_impexp_thread(void *unused)
1749 rc = cfs_daemonize_ctxt("obd_zombid");
1751 complete(&obd_zombie_start);
1755 complete(&obd_zombie_start);
1757 obd_zombie_pid = cfs_curproc_pid();
1759 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1760 struct l_wait_info lwi = { 0 };
1762 l_wait_event(obd_zombie_waitq,
1763 !obd_zombie_impexp_check(NULL), &lwi);
1764 obd_zombie_impexp_cull();
1767 * Notify obd_zombie_barrier callers that queues
1770 cfs_waitq_signal(&obd_zombie_waitq);
1773 complete(&obd_zombie_stop);
1778 #else /* ! KERNEL */
1780 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1781 static void *obd_zombie_impexp_work_cb;
1782 static void *obd_zombie_impexp_idle_cb;
1784 int obd_zombie_impexp_kill(void *arg)
1788 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1789 obd_zombie_impexp_cull();
1792 cfs_atomic_dec(&zombie_recur);
1799 * start destroy zombie import/export thread
1801 int obd_zombie_impexp_init(void)
1805 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1806 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1807 spin_lock_init(&obd_zombie_impexp_lock);
1808 init_completion(&obd_zombie_start);
1809 init_completion(&obd_zombie_stop);
1810 cfs_waitq_init(&obd_zombie_waitq);
1814 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1818 wait_for_completion(&obd_zombie_start);
1821 obd_zombie_impexp_work_cb =
1822 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1823 &obd_zombie_impexp_kill, NULL);
1825 obd_zombie_impexp_idle_cb =
1826 liblustre_register_idle_callback("obd_zombi_impexp_check",
1827 &obd_zombie_impexp_check, NULL);
1833 * stop destroy zombie import/export thread
1835 void obd_zombie_impexp_stop(void)
1837 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1838 obd_zombie_impexp_notify();
1840 wait_for_completion(&obd_zombie_stop);
1842 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1843 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1847 /***** Kernel-userspace comm helpers *******/
1849 /* Get length of entire message, including header */
1850 int kuc_len(int payload_len)
1852 return sizeof(struct kuc_hdr) + payload_len;
1854 EXPORT_SYMBOL(kuc_len);
1856 /* Get a pointer to kuc header, given a ptr to the payload
1857 * @param p Pointer to payload area
1858 * @returns Pointer to kuc header
1860 struct kuc_hdr * kuc_ptr(void *p)
1862 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1863 LASSERT(lh->kuc_magic == KUC_MAGIC);
1866 EXPORT_SYMBOL(kuc_ptr);
1868 /* Test if payload is part of kuc message
1869 * @param p Pointer to payload area
1872 int kuc_ispayload(void *p)
1874 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1876 if (kh->kuc_magic == KUC_MAGIC)
1881 EXPORT_SYMBOL(kuc_ispayload);
1883 /* Alloc space for a message, and fill in header
1884 * @return Pointer to payload area
1886 void *kuc_alloc(int payload_len, int transport, int type)
1889 int len = kuc_len(payload_len);
1893 return ERR_PTR(-ENOMEM);
1895 lh->kuc_magic = KUC_MAGIC;
1896 lh->kuc_transport = transport;
1897 lh->kuc_msgtype = type;
1898 lh->kuc_msglen = len;
1900 return (void *)(lh + 1);
1902 EXPORT_SYMBOL(kuc_alloc);
1904 /* Takes pointer to payload area */
1905 inline void kuc_free(void *p, int payload_len)
1907 struct kuc_hdr *lh = kuc_ptr(p);
1908 OBD_FREE(lh, kuc_len(payload_len));
1910 EXPORT_SYMBOL(kuc_free);