4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
129 modname = LUSTRE_MDT_NAME;
131 if (!cfs_request_module("%s", modname)) {
132 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
133 type = class_search_type(name);
135 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
141 spin_lock(&type->obd_type_lock);
143 cfs_try_module_get(type->typ_dt_ops->o_owner);
144 spin_unlock(&type->obd_type_lock);
148 EXPORT_SYMBOL(class_get_type);
150 void class_put_type(struct obd_type *type)
153 spin_lock(&type->obd_type_lock);
155 cfs_module_put(type->typ_dt_ops->o_owner);
156 spin_unlock(&type->obd_type_lock);
158 EXPORT_SYMBOL(class_put_type);
160 #define CLASS_MAX_NAME 1024
162 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
163 struct lprocfs_vars *vars, const char *name,
164 struct lu_device_type *ldt)
166 struct obd_type *type;
171 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
173 if (class_search_type(name)) {
174 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
179 OBD_ALLOC(type, sizeof(*type));
183 OBD_ALLOC_PTR(type->typ_dt_ops);
184 OBD_ALLOC_PTR(type->typ_md_ops);
185 OBD_ALLOC(type->typ_name, strlen(name) + 1);
187 if (type->typ_dt_ops == NULL ||
188 type->typ_md_ops == NULL ||
189 type->typ_name == NULL)
192 *(type->typ_dt_ops) = *dt_ops;
193 /* md_ops is optional */
195 *(type->typ_md_ops) = *md_ops;
196 strcpy(type->typ_name, name);
197 spin_lock_init(&type->obd_type_lock);
200 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
202 if (IS_ERR(type->typ_procroot)) {
203 rc = PTR_ERR(type->typ_procroot);
204 type->typ_procroot = NULL;
210 rc = lu_device_type_init(ldt);
215 spin_lock(&obd_types_lock);
216 cfs_list_add(&type->typ_chain, &obd_types);
217 spin_unlock(&obd_types_lock);
222 if (type->typ_name != NULL)
223 OBD_FREE(type->typ_name, strlen(name) + 1);
224 if (type->typ_md_ops != NULL)
225 OBD_FREE_PTR(type->typ_md_ops);
226 if (type->typ_dt_ops != NULL)
227 OBD_FREE_PTR(type->typ_dt_ops);
228 OBD_FREE(type, sizeof(*type));
231 EXPORT_SYMBOL(class_register_type);
233 int class_unregister_type(const char *name)
235 struct obd_type *type = class_search_type(name);
239 CERROR("unknown obd type\n");
243 if (type->typ_refcnt) {
244 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
245 /* This is a bad situation, let's make the best of it */
246 /* Remove ops, but leave the name for debugging */
247 OBD_FREE_PTR(type->typ_dt_ops);
248 OBD_FREE_PTR(type->typ_md_ops);
252 /* we do not use type->typ_procroot as for compatibility purposes
253 * other modules can share names (i.e. lod can use lov entry). so
254 * we can't reference pointer as it can get invalided when another
255 * module removes the entry */
256 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
259 lu_device_type_fini(type->typ_lu);
261 spin_lock(&obd_types_lock);
262 cfs_list_del(&type->typ_chain);
263 spin_unlock(&obd_types_lock);
264 OBD_FREE(type->typ_name, strlen(name) + 1);
265 if (type->typ_dt_ops != NULL)
266 OBD_FREE_PTR(type->typ_dt_ops);
267 if (type->typ_md_ops != NULL)
268 OBD_FREE_PTR(type->typ_md_ops);
269 OBD_FREE(type, sizeof(*type));
271 } /* class_unregister_type */
272 EXPORT_SYMBOL(class_unregister_type);
275 * Create a new obd device.
277 * Find an empty slot in ::obd_devs[], create a new obd device in it.
279 * \param[in] type_name obd device type string.
280 * \param[in] name obd device name.
282 * \retval NULL if create fails, otherwise return the obd device
285 struct obd_device *class_newdev(const char *type_name, const char *name)
287 struct obd_device *result = NULL;
288 struct obd_device *newdev;
289 struct obd_type *type = NULL;
291 int new_obd_minor = 0;
294 if (strlen(name) >= MAX_OBD_NAME) {
295 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
296 RETURN(ERR_PTR(-EINVAL));
299 type = class_get_type(type_name);
301 CERROR("OBD: unknown type: %s\n", type_name);
302 RETURN(ERR_PTR(-ENODEV));
305 newdev = obd_device_alloc();
307 GOTO(out_type, result = ERR_PTR(-ENOMEM));
309 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
311 write_lock(&obd_dev_lock);
312 for (i = 0; i < class_devno_max(); i++) {
313 struct obd_device *obd = class_num2obd(i);
315 if (obd && obd->obd_name &&
316 (strcmp(name, obd->obd_name) == 0)) {
317 CERROR("Device %s already exists at %d, won't add\n",
320 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
321 "%p obd_magic %08x != %08x\n", result,
322 result->obd_magic, OBD_DEVICE_MAGIC);
323 LASSERTF(result->obd_minor == new_obd_minor,
324 "%p obd_minor %d != %d\n", result,
325 result->obd_minor, new_obd_minor);
327 obd_devs[result->obd_minor] = NULL;
328 result->obd_name[0]='\0';
330 result = ERR_PTR(-EEXIST);
333 if (!result && !obd) {
335 result->obd_minor = i;
337 result->obd_type = type;
338 strncpy(result->obd_name, name,
339 sizeof(result->obd_name) - 1);
340 obd_devs[i] = result;
343 write_unlock(&obd_dev_lock);
345 if (result == NULL && i >= class_devno_max()) {
346 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
348 GOTO(out, result = ERR_PTR(-EOVERFLOW));
354 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
355 result->obd_name, result);
359 obd_device_free(newdev);
361 class_put_type(type);
365 void class_release_dev(struct obd_device *obd)
367 struct obd_type *obd_type = obd->obd_type;
369 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
370 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
371 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
372 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
373 LASSERT(obd_type != NULL);
375 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
376 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
378 write_lock(&obd_dev_lock);
379 obd_devs[obd->obd_minor] = NULL;
380 write_unlock(&obd_dev_lock);
381 obd_device_free(obd);
383 class_put_type(obd_type);
386 int class_name2dev(const char *name)
393 read_lock(&obd_dev_lock);
394 for (i = 0; i < class_devno_max(); i++) {
395 struct obd_device *obd = class_num2obd(i);
397 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
398 /* Make sure we finished attaching before we give
399 out any references */
400 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
401 if (obd->obd_attached) {
402 read_unlock(&obd_dev_lock);
408 read_unlock(&obd_dev_lock);
412 EXPORT_SYMBOL(class_name2dev);
414 struct obd_device *class_name2obd(const char *name)
416 int dev = class_name2dev(name);
418 if (dev < 0 || dev > class_devno_max())
420 return class_num2obd(dev);
422 EXPORT_SYMBOL(class_name2obd);
424 int class_uuid2dev(struct obd_uuid *uuid)
428 read_lock(&obd_dev_lock);
429 for (i = 0; i < class_devno_max(); i++) {
430 struct obd_device *obd = class_num2obd(i);
432 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
433 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
434 read_unlock(&obd_dev_lock);
438 read_unlock(&obd_dev_lock);
442 EXPORT_SYMBOL(class_uuid2dev);
444 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
446 int dev = class_uuid2dev(uuid);
449 return class_num2obd(dev);
451 EXPORT_SYMBOL(class_uuid2obd);
454 * Get obd device from ::obd_devs[]
456 * \param num [in] array index
458 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
459 * otherwise return the obd device there.
461 struct obd_device *class_num2obd(int num)
463 struct obd_device *obd = NULL;
465 if (num < class_devno_max()) {
470 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
471 "%p obd_magic %08x != %08x\n",
472 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
473 LASSERTF(obd->obd_minor == num,
474 "%p obd_minor %0d != %0d\n",
475 obd, obd->obd_minor, num);
480 EXPORT_SYMBOL(class_num2obd);
482 void class_obd_list(void)
487 read_lock(&obd_dev_lock);
488 for (i = 0; i < class_devno_max(); i++) {
489 struct obd_device *obd = class_num2obd(i);
493 if (obd->obd_stopping)
495 else if (obd->obd_set_up)
497 else if (obd->obd_attached)
501 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
502 i, status, obd->obd_type->typ_name,
503 obd->obd_name, obd->obd_uuid.uuid,
504 cfs_atomic_read(&obd->obd_refcount));
506 read_unlock(&obd_dev_lock);
510 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
511 specified, then only the client with that uuid is returned,
512 otherwise any client connected to the tgt is returned. */
513 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
514 const char * typ_name,
515 struct obd_uuid *grp_uuid)
519 read_lock(&obd_dev_lock);
520 for (i = 0; i < class_devno_max(); i++) {
521 struct obd_device *obd = class_num2obd(i);
525 if ((strncmp(obd->obd_type->typ_name, typ_name,
526 strlen(typ_name)) == 0)) {
527 if (obd_uuid_equals(tgt_uuid,
528 &obd->u.cli.cl_target_uuid) &&
529 ((grp_uuid)? obd_uuid_equals(grp_uuid,
530 &obd->obd_uuid) : 1)) {
531 read_unlock(&obd_dev_lock);
536 read_unlock(&obd_dev_lock);
540 EXPORT_SYMBOL(class_find_client_obd);
542 /* Iterate the obd_device list looking devices have grp_uuid. Start
543 searching at *next, and if a device is found, the next index to look
544 at is saved in *next. If next is NULL, then the first matching device
545 will always be returned. */
546 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
552 else if (*next >= 0 && *next < class_devno_max())
557 read_lock(&obd_dev_lock);
558 for (; i < class_devno_max(); i++) {
559 struct obd_device *obd = class_num2obd(i);
563 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
566 read_unlock(&obd_dev_lock);
570 read_unlock(&obd_dev_lock);
574 EXPORT_SYMBOL(class_devices_in_group);
577 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
578 * adjust sptlrpc settings accordingly.
580 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
582 struct obd_device *obd;
586 LASSERT(namelen > 0);
588 read_lock(&obd_dev_lock);
589 for (i = 0; i < class_devno_max(); i++) {
590 obd = class_num2obd(i);
592 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
595 /* only notify mdc, osc, mdt, ost */
596 type = obd->obd_type->typ_name;
597 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
598 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
599 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
600 strcmp(type, LUSTRE_OST_NAME) != 0)
603 if (strncmp(obd->obd_name, fsname, namelen))
606 class_incref(obd, __FUNCTION__, obd);
607 read_unlock(&obd_dev_lock);
608 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
609 sizeof(KEY_SPTLRPC_CONF),
610 KEY_SPTLRPC_CONF, 0, NULL, NULL);
612 class_decref(obd, __FUNCTION__, obd);
613 read_lock(&obd_dev_lock);
615 read_unlock(&obd_dev_lock);
618 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
620 void obd_cleanup_caches(void)
625 if (obd_device_cachep) {
626 rc = cfs_mem_cache_destroy(obd_device_cachep);
627 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
628 obd_device_cachep = NULL;
631 rc = cfs_mem_cache_destroy(obdo_cachep);
632 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
636 rc = cfs_mem_cache_destroy(import_cachep);
637 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
638 import_cachep = NULL;
641 rc = cfs_mem_cache_destroy(capa_cachep);
642 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
648 int obd_init_caches(void)
652 LASSERT(obd_device_cachep == NULL);
653 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
654 sizeof(struct obd_device),
656 if (!obd_device_cachep)
659 LASSERT(obdo_cachep == NULL);
660 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
665 LASSERT(import_cachep == NULL);
666 import_cachep = cfs_mem_cache_create("ll_import_cache",
667 sizeof(struct obd_import),
672 LASSERT(capa_cachep == NULL);
673 capa_cachep = cfs_mem_cache_create("capa_cache",
674 sizeof(struct obd_capa), 0, 0);
680 obd_cleanup_caches();
685 /* map connection to client */
686 struct obd_export *class_conn2export(struct lustre_handle *conn)
688 struct obd_export *export;
692 CDEBUG(D_CACHE, "looking for null handle\n");
696 if (conn->cookie == -1) { /* this means assign a new connection */
697 CDEBUG(D_CACHE, "want a new connection\n");
701 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
702 export = class_handle2object(conn->cookie);
705 EXPORT_SYMBOL(class_conn2export);
707 struct obd_device *class_exp2obd(struct obd_export *exp)
713 EXPORT_SYMBOL(class_exp2obd);
715 struct obd_device *class_conn2obd(struct lustre_handle *conn)
717 struct obd_export *export;
718 export = class_conn2export(conn);
720 struct obd_device *obd = export->exp_obd;
721 class_export_put(export);
726 EXPORT_SYMBOL(class_conn2obd);
728 struct obd_import *class_exp2cliimp(struct obd_export *exp)
730 struct obd_device *obd = exp->exp_obd;
733 return obd->u.cli.cl_import;
735 EXPORT_SYMBOL(class_exp2cliimp);
737 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
739 struct obd_device *obd = class_conn2obd(conn);
742 return obd->u.cli.cl_import;
744 EXPORT_SYMBOL(class_conn2cliimp);
746 /* Export management functions */
747 static void class_export_destroy(struct obd_export *exp)
749 struct obd_device *obd = exp->exp_obd;
752 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
753 LASSERT(obd != NULL);
755 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
756 exp->exp_client_uuid.uuid, obd->obd_name);
758 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
759 if (exp->exp_connection)
760 ptlrpc_put_connection_superhack(exp->exp_connection);
762 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
763 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
764 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
765 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
766 obd_destroy_export(exp);
767 class_decref(obd, "export", exp);
769 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
773 static void export_handle_addref(void *export)
775 class_export_get(export);
778 static struct portals_handle_ops export_handle_ops = {
779 .hop_addref = export_handle_addref,
783 struct obd_export *class_export_get(struct obd_export *exp)
785 cfs_atomic_inc(&exp->exp_refcount);
786 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
787 cfs_atomic_read(&exp->exp_refcount));
790 EXPORT_SYMBOL(class_export_get);
792 void class_export_put(struct obd_export *exp)
794 LASSERT(exp != NULL);
795 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
796 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
797 cfs_atomic_read(&exp->exp_refcount) - 1);
799 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
800 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
801 CDEBUG(D_IOCTL, "final put %p/%s\n",
802 exp, exp->exp_client_uuid.uuid);
804 /* release nid stat refererence */
805 lprocfs_exp_cleanup(exp);
807 obd_zombie_export_add(exp);
810 EXPORT_SYMBOL(class_export_put);
812 /* Creates a new export, adds it to the hash table, and returns a
813 * pointer to it. The refcount is 2: one for the hash reference, and
814 * one for the pointer returned by this function. */
815 struct obd_export *class_new_export(struct obd_device *obd,
816 struct obd_uuid *cluuid)
818 struct obd_export *export;
819 cfs_hash_t *hash = NULL;
823 OBD_ALLOC_PTR(export);
825 return ERR_PTR(-ENOMEM);
827 export->exp_conn_cnt = 0;
828 export->exp_lock_hash = NULL;
829 export->exp_flock_hash = NULL;
830 cfs_atomic_set(&export->exp_refcount, 2);
831 cfs_atomic_set(&export->exp_rpc_count, 0);
832 cfs_atomic_set(&export->exp_cb_count, 0);
833 cfs_atomic_set(&export->exp_locks_count, 0);
834 #if LUSTRE_TRACKS_LOCK_EXP_REFS
835 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
836 spin_lock_init(&export->exp_locks_list_guard);
838 cfs_atomic_set(&export->exp_replay_count, 0);
839 export->exp_obd = obd;
840 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
841 spin_lock_init(&export->exp_uncommitted_replies_lock);
842 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
843 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
844 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
845 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
846 class_handle_hash(&export->exp_handle, &export_handle_ops);
847 export->exp_last_request_time = cfs_time_current_sec();
848 spin_lock_init(&export->exp_lock);
849 spin_lock_init(&export->exp_rpc_lock);
850 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
851 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
852 spin_lock_init(&export->exp_bl_list_lock);
853 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
855 export->exp_sp_peer = LUSTRE_SP_ANY;
856 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
857 export->exp_client_uuid = *cluuid;
858 obd_init_export(export);
860 spin_lock(&obd->obd_dev_lock);
861 /* shouldn't happen, but might race */
862 if (obd->obd_stopping)
863 GOTO(exit_unlock, rc = -ENODEV);
865 hash = cfs_hash_getref(obd->obd_uuid_hash);
867 GOTO(exit_unlock, rc = -ENODEV);
868 spin_unlock(&obd->obd_dev_lock);
870 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
871 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
873 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
874 obd->obd_name, cluuid->uuid, rc);
875 GOTO(exit_err, rc = -EALREADY);
879 spin_lock(&obd->obd_dev_lock);
880 if (obd->obd_stopping) {
881 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
882 GOTO(exit_unlock, rc = -ENODEV);
885 class_incref(obd, "export", export);
886 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
887 cfs_list_add_tail(&export->exp_obd_chain_timed,
888 &export->exp_obd->obd_exports_timed);
889 export->exp_obd->obd_num_exports++;
890 spin_unlock(&obd->obd_dev_lock);
891 cfs_hash_putref(hash);
895 spin_unlock(&obd->obd_dev_lock);
898 cfs_hash_putref(hash);
899 class_handle_unhash(&export->exp_handle);
900 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
901 obd_destroy_export(export);
902 OBD_FREE_PTR(export);
905 EXPORT_SYMBOL(class_new_export);
907 void class_unlink_export(struct obd_export *exp)
909 class_handle_unhash(&exp->exp_handle);
911 spin_lock(&exp->exp_obd->obd_dev_lock);
912 /* delete an uuid-export hashitem from hashtables */
913 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
914 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
915 &exp->exp_client_uuid,
916 &exp->exp_uuid_hash);
918 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
919 cfs_list_del_init(&exp->exp_obd_chain_timed);
920 exp->exp_obd->obd_num_exports--;
921 spin_unlock(&exp->exp_obd->obd_dev_lock);
922 class_export_put(exp);
924 EXPORT_SYMBOL(class_unlink_export);
926 /* Import management functions */
927 void class_import_destroy(struct obd_import *imp)
931 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
932 imp->imp_obd->obd_name);
934 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
936 ptlrpc_put_connection_superhack(imp->imp_connection);
938 while (!cfs_list_empty(&imp->imp_conn_list)) {
939 struct obd_import_conn *imp_conn;
941 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
942 struct obd_import_conn, oic_item);
943 cfs_list_del_init(&imp_conn->oic_item);
944 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
945 OBD_FREE(imp_conn, sizeof(*imp_conn));
948 LASSERT(imp->imp_sec == NULL);
949 class_decref(imp->imp_obd, "import", imp);
950 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
954 static void import_handle_addref(void *import)
956 class_import_get(import);
959 static struct portals_handle_ops import_handle_ops = {
960 .hop_addref = import_handle_addref,
964 struct obd_import *class_import_get(struct obd_import *import)
966 cfs_atomic_inc(&import->imp_refcount);
967 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
968 cfs_atomic_read(&import->imp_refcount),
969 import->imp_obd->obd_name);
972 EXPORT_SYMBOL(class_import_get);
974 void class_import_put(struct obd_import *imp)
978 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
979 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
981 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
982 cfs_atomic_read(&imp->imp_refcount) - 1,
983 imp->imp_obd->obd_name);
985 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
986 CDEBUG(D_INFO, "final put import %p\n", imp);
987 obd_zombie_import_add(imp);
990 /* catch possible import put race */
991 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
994 EXPORT_SYMBOL(class_import_put);
996 static void init_imp_at(struct imp_at *at) {
998 at_init(&at->iat_net_latency, 0, 0);
999 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1000 /* max service estimates are tracked on the server side, so
1001 don't use the AT history here, just use the last reported
1002 val. (But keep hist for proc histogram, worst_ever) */
1003 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1008 struct obd_import *class_new_import(struct obd_device *obd)
1010 struct obd_import *imp;
1012 OBD_ALLOC(imp, sizeof(*imp));
1016 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1017 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1018 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1019 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1020 spin_lock_init(&imp->imp_lock);
1021 imp->imp_last_success_conn = 0;
1022 imp->imp_state = LUSTRE_IMP_NEW;
1023 imp->imp_obd = class_incref(obd, "import", imp);
1024 mutex_init(&imp->imp_sec_mutex);
1025 cfs_waitq_init(&imp->imp_recovery_waitq);
1027 cfs_atomic_set(&imp->imp_refcount, 2);
1028 cfs_atomic_set(&imp->imp_unregistering, 0);
1029 cfs_atomic_set(&imp->imp_inflight, 0);
1030 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1031 cfs_atomic_set(&imp->imp_inval_count, 0);
1032 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1033 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1034 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1035 init_imp_at(&imp->imp_at);
1037 /* the default magic is V2, will be used in connect RPC, and
1038 * then adjusted according to the flags in request/reply. */
1039 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1043 EXPORT_SYMBOL(class_new_import);
1045 void class_destroy_import(struct obd_import *import)
1047 LASSERT(import != NULL);
1048 LASSERT(import != LP_POISON);
1050 class_handle_unhash(&import->imp_handle);
1052 spin_lock(&import->imp_lock);
1053 import->imp_generation++;
1054 spin_unlock(&import->imp_lock);
1055 class_import_put(import);
1057 EXPORT_SYMBOL(class_destroy_import);
1059 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1061 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1063 spin_lock(&exp->exp_locks_list_guard);
1065 LASSERT(lock->l_exp_refs_nr >= 0);
1067 if (lock->l_exp_refs_target != NULL &&
1068 lock->l_exp_refs_target != exp) {
1069 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1070 exp, lock, lock->l_exp_refs_target);
1072 if ((lock->l_exp_refs_nr ++) == 0) {
1073 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1074 lock->l_exp_refs_target = exp;
1076 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1077 lock, exp, lock->l_exp_refs_nr);
1078 spin_unlock(&exp->exp_locks_list_guard);
1080 EXPORT_SYMBOL(__class_export_add_lock_ref);
1082 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1084 spin_lock(&exp->exp_locks_list_guard);
1085 LASSERT(lock->l_exp_refs_nr > 0);
1086 if (lock->l_exp_refs_target != exp) {
1087 LCONSOLE_WARN("lock %p, "
1088 "mismatching export pointers: %p, %p\n",
1089 lock, lock->l_exp_refs_target, exp);
1091 if (-- lock->l_exp_refs_nr == 0) {
1092 cfs_list_del_init(&lock->l_exp_refs_link);
1093 lock->l_exp_refs_target = NULL;
1095 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1096 lock, exp, lock->l_exp_refs_nr);
1097 spin_unlock(&exp->exp_locks_list_guard);
1099 EXPORT_SYMBOL(__class_export_del_lock_ref);
1102 /* A connection defines an export context in which preallocation can
1103 be managed. This releases the export pointer reference, and returns
1104 the export handle, so the export refcount is 1 when this function
1106 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1107 struct obd_uuid *cluuid)
1109 struct obd_export *export;
1110 LASSERT(conn != NULL);
1111 LASSERT(obd != NULL);
1112 LASSERT(cluuid != NULL);
1115 export = class_new_export(obd, cluuid);
1117 RETURN(PTR_ERR(export));
1119 conn->cookie = export->exp_handle.h_cookie;
1120 class_export_put(export);
1122 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1123 cluuid->uuid, conn->cookie);
1126 EXPORT_SYMBOL(class_connect);
1128 /* if export is involved in recovery then clean up related things */
1129 void class_export_recovery_cleanup(struct obd_export *exp)
1131 struct obd_device *obd = exp->exp_obd;
1133 spin_lock(&obd->obd_recovery_task_lock);
1134 if (exp->exp_delayed)
1135 obd->obd_delayed_clients--;
1136 if (obd->obd_recovering) {
1137 if (exp->exp_in_recovery) {
1138 spin_lock(&exp->exp_lock);
1139 exp->exp_in_recovery = 0;
1140 spin_unlock(&exp->exp_lock);
1141 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1142 cfs_atomic_dec(&obd->obd_connected_clients);
1145 /* if called during recovery then should update
1146 * obd_stale_clients counter,
1147 * lightweight exports are not counted */
1148 if (exp->exp_failed &&
1149 (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) == 0)
1150 exp->exp_obd->obd_stale_clients++;
1152 spin_unlock(&obd->obd_recovery_task_lock);
1153 /** Cleanup req replay fields */
1154 if (exp->exp_req_replay_needed) {
1155 spin_lock(&exp->exp_lock);
1156 exp->exp_req_replay_needed = 0;
1157 spin_unlock(&exp->exp_lock);
1158 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1159 cfs_atomic_dec(&obd->obd_req_replay_clients);
1161 /** Cleanup lock replay data */
1162 if (exp->exp_lock_replay_needed) {
1163 spin_lock(&exp->exp_lock);
1164 exp->exp_lock_replay_needed = 0;
1165 spin_unlock(&exp->exp_lock);
1166 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1167 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1171 /* This function removes 1-3 references from the export:
1172 * 1 - for export pointer passed
1173 * and if disconnect really need
1174 * 2 - removing from hash
1175 * 3 - in client_unlink_export
1176 * The export pointer passed to this function can destroyed */
1177 int class_disconnect(struct obd_export *export)
1179 int already_disconnected;
1182 if (export == NULL) {
1183 CWARN("attempting to free NULL export %p\n", export);
1187 spin_lock(&export->exp_lock);
1188 already_disconnected = export->exp_disconnected;
1189 export->exp_disconnected = 1;
1190 spin_unlock(&export->exp_lock);
1192 /* class_cleanup(), abort_recovery(), and class_fail_export()
1193 * all end up in here, and if any of them race we shouldn't
1194 * call extra class_export_puts(). */
1195 if (already_disconnected) {
1196 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1197 GOTO(no_disconn, already_disconnected);
1200 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1201 export->exp_handle.h_cookie);
1203 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1204 cfs_hash_del(export->exp_obd->obd_nid_hash,
1205 &export->exp_connection->c_peer.nid,
1206 &export->exp_nid_hash);
1208 class_export_recovery_cleanup(export);
1209 class_unlink_export(export);
1211 class_export_put(export);
1214 EXPORT_SYMBOL(class_disconnect);
1216 /* Return non-zero for a fully connected export */
1217 int class_connected_export(struct obd_export *exp)
1221 spin_lock(&exp->exp_lock);
1222 connected = (exp->exp_conn_cnt > 0);
1223 spin_unlock(&exp->exp_lock);
1228 EXPORT_SYMBOL(class_connected_export);
1230 static void class_disconnect_export_list(cfs_list_t *list,
1231 enum obd_option flags)
1234 struct obd_export *exp;
1237 /* It's possible that an export may disconnect itself, but
1238 * nothing else will be added to this list. */
1239 while (!cfs_list_empty(list)) {
1240 exp = cfs_list_entry(list->next, struct obd_export,
1242 /* need for safe call CDEBUG after obd_disconnect */
1243 class_export_get(exp);
1245 spin_lock(&exp->exp_lock);
1246 exp->exp_flags = flags;
1247 spin_unlock(&exp->exp_lock);
1249 if (obd_uuid_equals(&exp->exp_client_uuid,
1250 &exp->exp_obd->obd_uuid)) {
1252 "exp %p export uuid == obd uuid, don't discon\n",
1254 /* Need to delete this now so we don't end up pointing
1255 * to work_list later when this export is cleaned up. */
1256 cfs_list_del_init(&exp->exp_obd_chain);
1257 class_export_put(exp);
1261 class_export_get(exp);
1262 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1263 "last request at "CFS_TIME_T"\n",
1264 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1265 exp, exp->exp_last_request_time);
1266 /* release one export reference anyway */
1267 rc = obd_disconnect(exp);
1269 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1270 obd_export_nid2str(exp), exp, rc);
1271 class_export_put(exp);
1276 void class_disconnect_exports(struct obd_device *obd)
1278 cfs_list_t work_list;
1281 /* Move all of the exports from obd_exports to a work list, en masse. */
1282 CFS_INIT_LIST_HEAD(&work_list);
1283 spin_lock(&obd->obd_dev_lock);
1284 cfs_list_splice_init(&obd->obd_exports, &work_list);
1285 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1286 spin_unlock(&obd->obd_dev_lock);
1288 if (!cfs_list_empty(&work_list)) {
1289 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1290 "disconnecting them\n", obd->obd_minor, obd);
1291 class_disconnect_export_list(&work_list,
1292 exp_flags_from_obd(obd));
1294 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1295 obd->obd_minor, obd);
1298 EXPORT_SYMBOL(class_disconnect_exports);
1300 /* Remove exports that have not completed recovery.
1302 void class_disconnect_stale_exports(struct obd_device *obd,
1303 int (*test_export)(struct obd_export *))
1305 cfs_list_t work_list;
1306 struct obd_export *exp, *n;
1310 CFS_INIT_LIST_HEAD(&work_list);
1311 spin_lock(&obd->obd_dev_lock);
1312 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1314 /* don't count self-export as client */
1315 if (obd_uuid_equals(&exp->exp_client_uuid,
1316 &exp->exp_obd->obd_uuid))
1319 /* don't evict clients which have no slot in last_rcvd
1320 * (e.g. lightweight connection) */
1321 if (exp->exp_target_data.ted_lr_idx == -1)
1324 spin_lock(&exp->exp_lock);
1325 if (exp->exp_failed || test_export(exp)) {
1326 spin_unlock(&exp->exp_lock);
1329 exp->exp_failed = 1;
1330 spin_unlock(&exp->exp_lock);
1332 cfs_list_move(&exp->exp_obd_chain, &work_list);
1334 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1335 obd->obd_name, exp->exp_client_uuid.uuid,
1336 exp->exp_connection == NULL ? "<unknown>" :
1337 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1338 print_export_data(exp, "EVICTING", 0);
1340 spin_unlock(&obd->obd_dev_lock);
1343 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1344 obd->obd_name, evicted);
1346 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1347 OBD_OPT_ABORT_RECOV);
1350 EXPORT_SYMBOL(class_disconnect_stale_exports);
1352 void class_fail_export(struct obd_export *exp)
1354 int rc, already_failed;
1356 spin_lock(&exp->exp_lock);
1357 already_failed = exp->exp_failed;
1358 exp->exp_failed = 1;
1359 spin_unlock(&exp->exp_lock);
1361 if (already_failed) {
1362 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1363 exp, exp->exp_client_uuid.uuid);
1367 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1368 exp, exp->exp_client_uuid.uuid);
1370 if (obd_dump_on_timeout)
1371 libcfs_debug_dumplog();
1373 /* need for safe call CDEBUG after obd_disconnect */
1374 class_export_get(exp);
1376 /* Most callers into obd_disconnect are removing their own reference
1377 * (request, for example) in addition to the one from the hash table.
1378 * We don't have such a reference here, so make one. */
1379 class_export_get(exp);
1380 rc = obd_disconnect(exp);
1382 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1384 CDEBUG(D_HA, "disconnected export %p/%s\n",
1385 exp, exp->exp_client_uuid.uuid);
1386 class_export_put(exp);
1388 EXPORT_SYMBOL(class_fail_export);
1390 char *obd_export_nid2str(struct obd_export *exp)
1392 if (exp->exp_connection != NULL)
1393 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1397 EXPORT_SYMBOL(obd_export_nid2str);
1399 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1401 cfs_hash_t *nid_hash;
1402 struct obd_export *doomed_exp = NULL;
1403 int exports_evicted = 0;
1405 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1407 spin_lock(&obd->obd_dev_lock);
1408 /* umount has run already, so evict thread should leave
1409 * its task to umount thread now */
1410 if (obd->obd_stopping) {
1411 spin_unlock(&obd->obd_dev_lock);
1412 return exports_evicted;
1414 nid_hash = obd->obd_nid_hash;
1415 cfs_hash_getref(nid_hash);
1416 spin_unlock(&obd->obd_dev_lock);
1419 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1420 if (doomed_exp == NULL)
1423 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1424 "nid %s found, wanted nid %s, requested nid %s\n",
1425 obd_export_nid2str(doomed_exp),
1426 libcfs_nid2str(nid_key), nid);
1427 LASSERTF(doomed_exp != obd->obd_self_export,
1428 "self-export is hashed by NID?\n");
1430 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1431 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1433 class_fail_export(doomed_exp);
1434 class_export_put(doomed_exp);
1437 cfs_hash_putref(nid_hash);
1439 if (!exports_evicted)
1440 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1441 obd->obd_name, nid);
1442 return exports_evicted;
1444 EXPORT_SYMBOL(obd_export_evict_by_nid);
1446 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1448 cfs_hash_t *uuid_hash;
1449 struct obd_export *doomed_exp = NULL;
1450 struct obd_uuid doomed_uuid;
1451 int exports_evicted = 0;
1453 spin_lock(&obd->obd_dev_lock);
1454 if (obd->obd_stopping) {
1455 spin_unlock(&obd->obd_dev_lock);
1456 return exports_evicted;
1458 uuid_hash = obd->obd_uuid_hash;
1459 cfs_hash_getref(uuid_hash);
1460 spin_unlock(&obd->obd_dev_lock);
1462 obd_str2uuid(&doomed_uuid, uuid);
1463 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1464 CERROR("%s: can't evict myself\n", obd->obd_name);
1465 cfs_hash_putref(uuid_hash);
1466 return exports_evicted;
1469 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1471 if (doomed_exp == NULL) {
1472 CERROR("%s: can't disconnect %s: no exports found\n",
1473 obd->obd_name, uuid);
1475 CWARN("%s: evicting %s at adminstrative request\n",
1476 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1477 class_fail_export(doomed_exp);
1478 class_export_put(doomed_exp);
1481 cfs_hash_putref(uuid_hash);
1483 return exports_evicted;
1485 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1487 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1488 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1489 EXPORT_SYMBOL(class_export_dump_hook);
1492 static void print_export_data(struct obd_export *exp, const char *status,
1495 struct ptlrpc_reply_state *rs;
1496 struct ptlrpc_reply_state *first_reply = NULL;
1499 spin_lock(&exp->exp_lock);
1500 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1506 spin_unlock(&exp->exp_lock);
1508 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1509 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1510 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1511 cfs_atomic_read(&exp->exp_rpc_count),
1512 cfs_atomic_read(&exp->exp_cb_count),
1513 cfs_atomic_read(&exp->exp_locks_count),
1514 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1515 nreplies, first_reply, nreplies > 3 ? "..." : "",
1516 exp->exp_last_committed);
1517 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1518 if (locks && class_export_dump_hook != NULL)
1519 class_export_dump_hook(exp);
1523 void dump_exports(struct obd_device *obd, int locks)
1525 struct obd_export *exp;
1527 spin_lock(&obd->obd_dev_lock);
1528 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1529 print_export_data(exp, "ACTIVE", locks);
1530 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1531 print_export_data(exp, "UNLINKED", locks);
1532 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1533 print_export_data(exp, "DELAYED", locks);
1534 spin_unlock(&obd->obd_dev_lock);
1535 spin_lock(&obd_zombie_impexp_lock);
1536 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1537 print_export_data(exp, "ZOMBIE", locks);
1538 spin_unlock(&obd_zombie_impexp_lock);
1540 EXPORT_SYMBOL(dump_exports);
1542 void obd_exports_barrier(struct obd_device *obd)
1545 LASSERT(cfs_list_empty(&obd->obd_exports));
1546 spin_lock(&obd->obd_dev_lock);
1547 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1548 spin_unlock(&obd->obd_dev_lock);
1549 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1550 cfs_time_seconds(waited));
1551 if (waited > 5 && IS_PO2(waited)) {
1552 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1553 "more than %d seconds. "
1554 "The obd refcount = %d. Is it stuck?\n",
1555 obd->obd_name, waited,
1556 cfs_atomic_read(&obd->obd_refcount));
1557 dump_exports(obd, 1);
1560 spin_lock(&obd->obd_dev_lock);
1562 spin_unlock(&obd->obd_dev_lock);
1564 EXPORT_SYMBOL(obd_exports_barrier);
1566 /* Total amount of zombies to be destroyed */
1567 static int zombies_count = 0;
1570 * kill zombie imports and exports
1572 void obd_zombie_impexp_cull(void)
1574 struct obd_import *import;
1575 struct obd_export *export;
1579 spin_lock(&obd_zombie_impexp_lock);
1582 if (!cfs_list_empty(&obd_zombie_imports)) {
1583 import = cfs_list_entry(obd_zombie_imports.next,
1586 cfs_list_del_init(&import->imp_zombie_chain);
1590 if (!cfs_list_empty(&obd_zombie_exports)) {
1591 export = cfs_list_entry(obd_zombie_exports.next,
1594 cfs_list_del_init(&export->exp_obd_chain);
1597 spin_unlock(&obd_zombie_impexp_lock);
1599 if (import != NULL) {
1600 class_import_destroy(import);
1601 spin_lock(&obd_zombie_impexp_lock);
1603 spin_unlock(&obd_zombie_impexp_lock);
1606 if (export != NULL) {
1607 class_export_destroy(export);
1608 spin_lock(&obd_zombie_impexp_lock);
1610 spin_unlock(&obd_zombie_impexp_lock);
1614 } while (import != NULL || export != NULL);
1618 static struct completion obd_zombie_start;
1619 static struct completion obd_zombie_stop;
1620 static unsigned long obd_zombie_flags;
1621 static cfs_waitq_t obd_zombie_waitq;
1622 static pid_t obd_zombie_pid;
1625 OBD_ZOMBIE_STOP = 0x0001,
1629 * check for work for kill zombie import/export thread.
1631 static int obd_zombie_impexp_check(void *arg)
1635 spin_lock(&obd_zombie_impexp_lock);
1636 rc = (zombies_count == 0) &&
1637 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1638 spin_unlock(&obd_zombie_impexp_lock);
1644 * Add export to the obd_zombe thread and notify it.
1646 static void obd_zombie_export_add(struct obd_export *exp) {
1647 spin_lock(&exp->exp_obd->obd_dev_lock);
1648 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1649 cfs_list_del_init(&exp->exp_obd_chain);
1650 spin_unlock(&exp->exp_obd->obd_dev_lock);
1651 spin_lock(&obd_zombie_impexp_lock);
1653 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1654 spin_unlock(&obd_zombie_impexp_lock);
1656 obd_zombie_impexp_notify();
1660 * Add import to the obd_zombe thread and notify it.
1662 static void obd_zombie_import_add(struct obd_import *imp) {
1663 LASSERT(imp->imp_sec == NULL);
1664 LASSERT(imp->imp_rq_pool == NULL);
1665 spin_lock(&obd_zombie_impexp_lock);
1666 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1668 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1669 spin_unlock(&obd_zombie_impexp_lock);
1671 obd_zombie_impexp_notify();
1675 * notify import/export destroy thread about new zombie.
1677 static void obd_zombie_impexp_notify(void)
1680 * Make sure obd_zomebie_impexp_thread get this notification.
1681 * It is possible this signal only get by obd_zombie_barrier, and
1682 * barrier gulps this notification and sleeps away and hangs ensues
1684 cfs_waitq_broadcast(&obd_zombie_waitq);
1688 * check whether obd_zombie is idle
1690 static int obd_zombie_is_idle(void)
1694 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1695 spin_lock(&obd_zombie_impexp_lock);
1696 rc = (zombies_count == 0);
1697 spin_unlock(&obd_zombie_impexp_lock);
1702 * wait when obd_zombie import/export queues become empty
1704 void obd_zombie_barrier(void)
1706 struct l_wait_info lwi = { 0 };
1708 if (obd_zombie_pid == cfs_curproc_pid())
1709 /* don't wait for myself */
1711 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1713 EXPORT_SYMBOL(obd_zombie_barrier);
1718 * destroy zombie export/import thread.
1720 static int obd_zombie_impexp_thread(void *unused)
1724 rc = cfs_daemonize_ctxt("obd_zombid");
1726 complete(&obd_zombie_start);
1730 complete(&obd_zombie_start);
1732 obd_zombie_pid = cfs_curproc_pid();
1734 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1735 struct l_wait_info lwi = { 0 };
1737 l_wait_event(obd_zombie_waitq,
1738 !obd_zombie_impexp_check(NULL), &lwi);
1739 obd_zombie_impexp_cull();
1742 * Notify obd_zombie_barrier callers that queues
1745 cfs_waitq_signal(&obd_zombie_waitq);
1748 complete(&obd_zombie_stop);
1753 #else /* ! KERNEL */
1755 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1756 static void *obd_zombie_impexp_work_cb;
1757 static void *obd_zombie_impexp_idle_cb;
1759 int obd_zombie_impexp_kill(void *arg)
1763 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1764 obd_zombie_impexp_cull();
1767 cfs_atomic_dec(&zombie_recur);
1774 * start destroy zombie import/export thread
1776 int obd_zombie_impexp_init(void)
1780 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1781 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1782 spin_lock_init(&obd_zombie_impexp_lock);
1783 init_completion(&obd_zombie_start);
1784 init_completion(&obd_zombie_stop);
1785 cfs_waitq_init(&obd_zombie_waitq);
1789 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1793 wait_for_completion(&obd_zombie_start);
1796 obd_zombie_impexp_work_cb =
1797 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1798 &obd_zombie_impexp_kill, NULL);
1800 obd_zombie_impexp_idle_cb =
1801 liblustre_register_idle_callback("obd_zombi_impexp_check",
1802 &obd_zombie_impexp_check, NULL);
1808 * stop destroy zombie import/export thread
1810 void obd_zombie_impexp_stop(void)
1812 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1813 obd_zombie_impexp_notify();
1815 wait_for_completion(&obd_zombie_stop);
1817 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1818 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1822 /***** Kernel-userspace comm helpers *******/
1824 /* Get length of entire message, including header */
1825 int kuc_len(int payload_len)
1827 return sizeof(struct kuc_hdr) + payload_len;
1829 EXPORT_SYMBOL(kuc_len);
1831 /* Get a pointer to kuc header, given a ptr to the payload
1832 * @param p Pointer to payload area
1833 * @returns Pointer to kuc header
1835 struct kuc_hdr * kuc_ptr(void *p)
1837 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1838 LASSERT(lh->kuc_magic == KUC_MAGIC);
1841 EXPORT_SYMBOL(kuc_ptr);
1843 /* Test if payload is part of kuc message
1844 * @param p Pointer to payload area
1847 int kuc_ispayload(void *p)
1849 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1851 if (kh->kuc_magic == KUC_MAGIC)
1856 EXPORT_SYMBOL(kuc_ispayload);
1858 /* Alloc space for a message, and fill in header
1859 * @return Pointer to payload area
1861 void *kuc_alloc(int payload_len, int transport, int type)
1864 int len = kuc_len(payload_len);
1868 return ERR_PTR(-ENOMEM);
1870 lh->kuc_magic = KUC_MAGIC;
1871 lh->kuc_transport = transport;
1872 lh->kuc_msgtype = type;
1873 lh->kuc_msglen = len;
1875 return (void *)(lh + 1);
1877 EXPORT_SYMBOL(kuc_alloc);
1879 /* Takes pointer to payload area */
1880 inline void kuc_free(void *p, int payload_len)
1882 struct kuc_hdr *lh = kuc_ptr(p);
1883 OBD_FREE(lh, kuc_len(payload_len));
1885 EXPORT_SYMBOL(kuc_free);