4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!cfs_request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 cfs_try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
151 EXPORT_SYMBOL(class_get_type);
153 void class_put_type(struct obd_type *type)
156 spin_lock(&type->obd_type_lock);
158 cfs_module_put(type->typ_dt_ops->o_owner);
159 spin_unlock(&type->obd_type_lock);
161 EXPORT_SYMBOL(class_put_type);
163 #define CLASS_MAX_NAME 1024
165 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
166 struct lprocfs_vars *vars, const char *name,
167 struct lu_device_type *ldt)
169 struct obd_type *type;
174 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
176 if (class_search_type(name)) {
177 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
182 OBD_ALLOC(type, sizeof(*type));
186 OBD_ALLOC_PTR(type->typ_dt_ops);
187 OBD_ALLOC_PTR(type->typ_md_ops);
188 OBD_ALLOC(type->typ_name, strlen(name) + 1);
190 if (type->typ_dt_ops == NULL ||
191 type->typ_md_ops == NULL ||
192 type->typ_name == NULL)
195 *(type->typ_dt_ops) = *dt_ops;
196 /* md_ops is optional */
198 *(type->typ_md_ops) = *md_ops;
199 strcpy(type->typ_name, name);
200 spin_lock_init(&type->obd_type_lock);
203 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
205 if (IS_ERR(type->typ_procroot)) {
206 rc = PTR_ERR(type->typ_procroot);
207 type->typ_procroot = NULL;
213 rc = lu_device_type_init(ldt);
218 spin_lock(&obd_types_lock);
219 cfs_list_add(&type->typ_chain, &obd_types);
220 spin_unlock(&obd_types_lock);
225 if (type->typ_name != NULL)
226 OBD_FREE(type->typ_name, strlen(name) + 1);
227 if (type->typ_md_ops != NULL)
228 OBD_FREE_PTR(type->typ_md_ops);
229 if (type->typ_dt_ops != NULL)
230 OBD_FREE_PTR(type->typ_dt_ops);
231 OBD_FREE(type, sizeof(*type));
234 EXPORT_SYMBOL(class_register_type);
236 int class_unregister_type(const char *name)
238 struct obd_type *type = class_search_type(name);
242 CERROR("unknown obd type\n");
246 if (type->typ_refcnt) {
247 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
248 /* This is a bad situation, let's make the best of it */
249 /* Remove ops, but leave the name for debugging */
250 OBD_FREE_PTR(type->typ_dt_ops);
251 OBD_FREE_PTR(type->typ_md_ops);
255 /* we do not use type->typ_procroot as for compatibility purposes
256 * other modules can share names (i.e. lod can use lov entry). so
257 * we can't reference pointer as it can get invalided when another
258 * module removes the entry */
259 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
262 lu_device_type_fini(type->typ_lu);
264 spin_lock(&obd_types_lock);
265 cfs_list_del(&type->typ_chain);
266 spin_unlock(&obd_types_lock);
267 OBD_FREE(type->typ_name, strlen(name) + 1);
268 if (type->typ_dt_ops != NULL)
269 OBD_FREE_PTR(type->typ_dt_ops);
270 if (type->typ_md_ops != NULL)
271 OBD_FREE_PTR(type->typ_md_ops);
272 OBD_FREE(type, sizeof(*type));
274 } /* class_unregister_type */
275 EXPORT_SYMBOL(class_unregister_type);
278 * Create a new obd device.
280 * Find an empty slot in ::obd_devs[], create a new obd device in it.
282 * \param[in] type_name obd device type string.
283 * \param[in] name obd device name.
285 * \retval NULL if create fails, otherwise return the obd device
288 struct obd_device *class_newdev(const char *type_name, const char *name)
290 struct obd_device *result = NULL;
291 struct obd_device *newdev;
292 struct obd_type *type = NULL;
294 int new_obd_minor = 0;
297 if (strlen(name) >= MAX_OBD_NAME) {
298 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
299 RETURN(ERR_PTR(-EINVAL));
302 type = class_get_type(type_name);
304 CERROR("OBD: unknown type: %s\n", type_name);
305 RETURN(ERR_PTR(-ENODEV));
308 newdev = obd_device_alloc();
310 GOTO(out_type, result = ERR_PTR(-ENOMEM));
312 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
314 write_lock(&obd_dev_lock);
315 for (i = 0; i < class_devno_max(); i++) {
316 struct obd_device *obd = class_num2obd(i);
318 if (obd && obd->obd_name &&
319 (strcmp(name, obd->obd_name) == 0)) {
320 CERROR("Device %s already exists at %d, won't add\n",
323 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
324 "%p obd_magic %08x != %08x\n", result,
325 result->obd_magic, OBD_DEVICE_MAGIC);
326 LASSERTF(result->obd_minor == new_obd_minor,
327 "%p obd_minor %d != %d\n", result,
328 result->obd_minor, new_obd_minor);
330 obd_devs[result->obd_minor] = NULL;
331 result->obd_name[0]='\0';
333 result = ERR_PTR(-EEXIST);
336 if (!result && !obd) {
338 result->obd_minor = i;
340 result->obd_type = type;
341 strncpy(result->obd_name, name,
342 sizeof(result->obd_name) - 1);
343 obd_devs[i] = result;
346 write_unlock(&obd_dev_lock);
348 if (result == NULL && i >= class_devno_max()) {
349 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
351 GOTO(out, result = ERR_PTR(-EOVERFLOW));
357 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
358 result->obd_name, result);
362 obd_device_free(newdev);
364 class_put_type(type);
368 void class_release_dev(struct obd_device *obd)
370 struct obd_type *obd_type = obd->obd_type;
372 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
373 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
374 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
375 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
376 LASSERT(obd_type != NULL);
378 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
379 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
381 write_lock(&obd_dev_lock);
382 obd_devs[obd->obd_minor] = NULL;
383 write_unlock(&obd_dev_lock);
384 obd_device_free(obd);
386 class_put_type(obd_type);
389 int class_name2dev(const char *name)
396 read_lock(&obd_dev_lock);
397 for (i = 0; i < class_devno_max(); i++) {
398 struct obd_device *obd = class_num2obd(i);
400 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
401 /* Make sure we finished attaching before we give
402 out any references */
403 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
404 if (obd->obd_attached) {
405 read_unlock(&obd_dev_lock);
411 read_unlock(&obd_dev_lock);
415 EXPORT_SYMBOL(class_name2dev);
417 struct obd_device *class_name2obd(const char *name)
419 int dev = class_name2dev(name);
421 if (dev < 0 || dev > class_devno_max())
423 return class_num2obd(dev);
425 EXPORT_SYMBOL(class_name2obd);
427 int class_uuid2dev(struct obd_uuid *uuid)
431 read_lock(&obd_dev_lock);
432 for (i = 0; i < class_devno_max(); i++) {
433 struct obd_device *obd = class_num2obd(i);
435 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
436 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
437 read_unlock(&obd_dev_lock);
441 read_unlock(&obd_dev_lock);
445 EXPORT_SYMBOL(class_uuid2dev);
447 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
449 int dev = class_uuid2dev(uuid);
452 return class_num2obd(dev);
454 EXPORT_SYMBOL(class_uuid2obd);
457 * Get obd device from ::obd_devs[]
459 * \param num [in] array index
461 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
462 * otherwise return the obd device there.
464 struct obd_device *class_num2obd(int num)
466 struct obd_device *obd = NULL;
468 if (num < class_devno_max()) {
473 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
474 "%p obd_magic %08x != %08x\n",
475 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
476 LASSERTF(obd->obd_minor == num,
477 "%p obd_minor %0d != %0d\n",
478 obd, obd->obd_minor, num);
483 EXPORT_SYMBOL(class_num2obd);
486 * Get obd devices count. Device in any
488 * \retval obd device count
490 int get_devices_count(void)
492 int index, max_index = class_devno_max(), dev_count = 0;
494 read_lock(&obd_dev_lock);
495 for (index = 0; index <= max_index; index++) {
496 struct obd_device *obd = class_num2obd(index);
500 read_unlock(&obd_dev_lock);
504 EXPORT_SYMBOL(get_devices_count);
506 void class_obd_list(void)
511 read_lock(&obd_dev_lock);
512 for (i = 0; i < class_devno_max(); i++) {
513 struct obd_device *obd = class_num2obd(i);
517 if (obd->obd_stopping)
519 else if (obd->obd_set_up)
521 else if (obd->obd_attached)
525 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
526 i, status, obd->obd_type->typ_name,
527 obd->obd_name, obd->obd_uuid.uuid,
528 cfs_atomic_read(&obd->obd_refcount));
530 read_unlock(&obd_dev_lock);
534 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
535 specified, then only the client with that uuid is returned,
536 otherwise any client connected to the tgt is returned. */
537 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
538 const char * typ_name,
539 struct obd_uuid *grp_uuid)
543 read_lock(&obd_dev_lock);
544 for (i = 0; i < class_devno_max(); i++) {
545 struct obd_device *obd = class_num2obd(i);
549 if ((strncmp(obd->obd_type->typ_name, typ_name,
550 strlen(typ_name)) == 0)) {
551 if (obd_uuid_equals(tgt_uuid,
552 &obd->u.cli.cl_target_uuid) &&
553 ((grp_uuid)? obd_uuid_equals(grp_uuid,
554 &obd->obd_uuid) : 1)) {
555 read_unlock(&obd_dev_lock);
560 read_unlock(&obd_dev_lock);
564 EXPORT_SYMBOL(class_find_client_obd);
566 /* Iterate the obd_device list looking devices have grp_uuid. Start
567 searching at *next, and if a device is found, the next index to look
568 at is saved in *next. If next is NULL, then the first matching device
569 will always be returned. */
570 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
576 else if (*next >= 0 && *next < class_devno_max())
581 read_lock(&obd_dev_lock);
582 for (; i < class_devno_max(); i++) {
583 struct obd_device *obd = class_num2obd(i);
587 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
590 read_unlock(&obd_dev_lock);
594 read_unlock(&obd_dev_lock);
598 EXPORT_SYMBOL(class_devices_in_group);
601 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
602 * adjust sptlrpc settings accordingly.
604 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
606 struct obd_device *obd;
610 LASSERT(namelen > 0);
612 read_lock(&obd_dev_lock);
613 for (i = 0; i < class_devno_max(); i++) {
614 obd = class_num2obd(i);
616 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
619 /* only notify mdc, osc, mdt, ost */
620 type = obd->obd_type->typ_name;
621 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
622 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
623 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
624 strcmp(type, LUSTRE_OST_NAME) != 0)
627 if (strncmp(obd->obd_name, fsname, namelen))
630 class_incref(obd, __FUNCTION__, obd);
631 read_unlock(&obd_dev_lock);
632 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
633 sizeof(KEY_SPTLRPC_CONF),
634 KEY_SPTLRPC_CONF, 0, NULL, NULL);
636 class_decref(obd, __FUNCTION__, obd);
637 read_lock(&obd_dev_lock);
639 read_unlock(&obd_dev_lock);
642 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
644 void obd_cleanup_caches(void)
649 if (obd_device_cachep) {
650 rc = cfs_mem_cache_destroy(obd_device_cachep);
651 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
652 obd_device_cachep = NULL;
655 rc = cfs_mem_cache_destroy(obdo_cachep);
656 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
660 rc = cfs_mem_cache_destroy(import_cachep);
661 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
662 import_cachep = NULL;
665 rc = cfs_mem_cache_destroy(capa_cachep);
666 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
672 int obd_init_caches(void)
676 LASSERT(obd_device_cachep == NULL);
677 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
678 sizeof(struct obd_device),
680 if (!obd_device_cachep)
683 LASSERT(obdo_cachep == NULL);
684 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
689 LASSERT(import_cachep == NULL);
690 import_cachep = cfs_mem_cache_create("ll_import_cache",
691 sizeof(struct obd_import),
696 LASSERT(capa_cachep == NULL);
697 capa_cachep = cfs_mem_cache_create("capa_cache",
698 sizeof(struct obd_capa), 0, 0);
704 obd_cleanup_caches();
709 /* map connection to client */
710 struct obd_export *class_conn2export(struct lustre_handle *conn)
712 struct obd_export *export;
716 CDEBUG(D_CACHE, "looking for null handle\n");
720 if (conn->cookie == -1) { /* this means assign a new connection */
721 CDEBUG(D_CACHE, "want a new connection\n");
725 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
726 export = class_handle2object(conn->cookie);
729 EXPORT_SYMBOL(class_conn2export);
731 struct obd_device *class_exp2obd(struct obd_export *exp)
737 EXPORT_SYMBOL(class_exp2obd);
739 struct obd_device *class_conn2obd(struct lustre_handle *conn)
741 struct obd_export *export;
742 export = class_conn2export(conn);
744 struct obd_device *obd = export->exp_obd;
745 class_export_put(export);
750 EXPORT_SYMBOL(class_conn2obd);
752 struct obd_import *class_exp2cliimp(struct obd_export *exp)
754 struct obd_device *obd = exp->exp_obd;
757 return obd->u.cli.cl_import;
759 EXPORT_SYMBOL(class_exp2cliimp);
761 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
763 struct obd_device *obd = class_conn2obd(conn);
766 return obd->u.cli.cl_import;
768 EXPORT_SYMBOL(class_conn2cliimp);
770 /* Export management functions */
771 static void class_export_destroy(struct obd_export *exp)
773 struct obd_device *obd = exp->exp_obd;
776 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
777 LASSERT(obd != NULL);
779 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
780 exp->exp_client_uuid.uuid, obd->obd_name);
782 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
783 if (exp->exp_connection)
784 ptlrpc_put_connection_superhack(exp->exp_connection);
786 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
787 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
788 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
789 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
790 obd_destroy_export(exp);
791 class_decref(obd, "export", exp);
793 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
797 static void export_handle_addref(void *export)
799 class_export_get(export);
802 static struct portals_handle_ops export_handle_ops = {
803 .hop_addref = export_handle_addref,
807 struct obd_export *class_export_get(struct obd_export *exp)
809 cfs_atomic_inc(&exp->exp_refcount);
810 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
811 cfs_atomic_read(&exp->exp_refcount));
814 EXPORT_SYMBOL(class_export_get);
816 void class_export_put(struct obd_export *exp)
818 LASSERT(exp != NULL);
819 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
820 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
821 cfs_atomic_read(&exp->exp_refcount) - 1);
823 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
824 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
825 CDEBUG(D_IOCTL, "final put %p/%s\n",
826 exp, exp->exp_client_uuid.uuid);
828 /* release nid stat refererence */
829 lprocfs_exp_cleanup(exp);
831 obd_zombie_export_add(exp);
834 EXPORT_SYMBOL(class_export_put);
836 /* Creates a new export, adds it to the hash table, and returns a
837 * pointer to it. The refcount is 2: one for the hash reference, and
838 * one for the pointer returned by this function. */
839 struct obd_export *class_new_export(struct obd_device *obd,
840 struct obd_uuid *cluuid)
842 struct obd_export *export;
843 cfs_hash_t *hash = NULL;
847 OBD_ALLOC_PTR(export);
849 return ERR_PTR(-ENOMEM);
851 export->exp_conn_cnt = 0;
852 export->exp_lock_hash = NULL;
853 export->exp_flock_hash = NULL;
854 cfs_atomic_set(&export->exp_refcount, 2);
855 cfs_atomic_set(&export->exp_rpc_count, 0);
856 cfs_atomic_set(&export->exp_cb_count, 0);
857 cfs_atomic_set(&export->exp_locks_count, 0);
858 #if LUSTRE_TRACKS_LOCK_EXP_REFS
859 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
860 spin_lock_init(&export->exp_locks_list_guard);
862 cfs_atomic_set(&export->exp_replay_count, 0);
863 export->exp_obd = obd;
864 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
865 spin_lock_init(&export->exp_uncommitted_replies_lock);
866 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
867 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
868 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
869 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
870 class_handle_hash(&export->exp_handle, &export_handle_ops);
871 export->exp_last_request_time = cfs_time_current_sec();
872 spin_lock_init(&export->exp_lock);
873 spin_lock_init(&export->exp_rpc_lock);
874 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
875 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
876 spin_lock_init(&export->exp_bl_list_lock);
877 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
879 export->exp_sp_peer = LUSTRE_SP_ANY;
880 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
881 export->exp_client_uuid = *cluuid;
882 obd_init_export(export);
884 spin_lock(&obd->obd_dev_lock);
885 /* shouldn't happen, but might race */
886 if (obd->obd_stopping)
887 GOTO(exit_unlock, rc = -ENODEV);
889 hash = cfs_hash_getref(obd->obd_uuid_hash);
891 GOTO(exit_unlock, rc = -ENODEV);
892 spin_unlock(&obd->obd_dev_lock);
894 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
895 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
897 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
898 obd->obd_name, cluuid->uuid, rc);
899 GOTO(exit_err, rc = -EALREADY);
903 spin_lock(&obd->obd_dev_lock);
904 if (obd->obd_stopping) {
905 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
906 GOTO(exit_unlock, rc = -ENODEV);
909 class_incref(obd, "export", export);
910 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
911 cfs_list_add_tail(&export->exp_obd_chain_timed,
912 &export->exp_obd->obd_exports_timed);
913 export->exp_obd->obd_num_exports++;
914 spin_unlock(&obd->obd_dev_lock);
915 cfs_hash_putref(hash);
919 spin_unlock(&obd->obd_dev_lock);
922 cfs_hash_putref(hash);
923 class_handle_unhash(&export->exp_handle);
924 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
925 obd_destroy_export(export);
926 OBD_FREE_PTR(export);
929 EXPORT_SYMBOL(class_new_export);
931 void class_unlink_export(struct obd_export *exp)
933 class_handle_unhash(&exp->exp_handle);
935 spin_lock(&exp->exp_obd->obd_dev_lock);
936 /* delete an uuid-export hashitem from hashtables */
937 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
938 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
939 &exp->exp_client_uuid,
940 &exp->exp_uuid_hash);
942 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
943 cfs_list_del_init(&exp->exp_obd_chain_timed);
944 exp->exp_obd->obd_num_exports--;
945 spin_unlock(&exp->exp_obd->obd_dev_lock);
946 class_export_put(exp);
948 EXPORT_SYMBOL(class_unlink_export);
950 /* Import management functions */
951 void class_import_destroy(struct obd_import *imp)
955 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
956 imp->imp_obd->obd_name);
958 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
960 ptlrpc_put_connection_superhack(imp->imp_connection);
962 while (!cfs_list_empty(&imp->imp_conn_list)) {
963 struct obd_import_conn *imp_conn;
965 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
966 struct obd_import_conn, oic_item);
967 cfs_list_del_init(&imp_conn->oic_item);
968 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
969 OBD_FREE(imp_conn, sizeof(*imp_conn));
972 LASSERT(imp->imp_sec == NULL);
973 class_decref(imp->imp_obd, "import", imp);
974 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
978 static void import_handle_addref(void *import)
980 class_import_get(import);
983 static struct portals_handle_ops import_handle_ops = {
984 .hop_addref = import_handle_addref,
988 struct obd_import *class_import_get(struct obd_import *import)
990 cfs_atomic_inc(&import->imp_refcount);
991 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
992 cfs_atomic_read(&import->imp_refcount),
993 import->imp_obd->obd_name);
996 EXPORT_SYMBOL(class_import_get);
998 void class_import_put(struct obd_import *imp)
1002 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1003 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1005 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1006 cfs_atomic_read(&imp->imp_refcount) - 1,
1007 imp->imp_obd->obd_name);
1009 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1010 CDEBUG(D_INFO, "final put import %p\n", imp);
1011 obd_zombie_import_add(imp);
1014 /* catch possible import put race */
1015 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1018 EXPORT_SYMBOL(class_import_put);
1020 static void init_imp_at(struct imp_at *at) {
1022 at_init(&at->iat_net_latency, 0, 0);
1023 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1024 /* max service estimates are tracked on the server side, so
1025 don't use the AT history here, just use the last reported
1026 val. (But keep hist for proc histogram, worst_ever) */
1027 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1032 struct obd_import *class_new_import(struct obd_device *obd)
1034 struct obd_import *imp;
1036 OBD_ALLOC(imp, sizeof(*imp));
1040 CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1041 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1042 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1043 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1044 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1045 spin_lock_init(&imp->imp_lock);
1046 imp->imp_last_success_conn = 0;
1047 imp->imp_state = LUSTRE_IMP_NEW;
1048 imp->imp_obd = class_incref(obd, "import", imp);
1049 mutex_init(&imp->imp_sec_mutex);
1050 cfs_waitq_init(&imp->imp_recovery_waitq);
1052 cfs_atomic_set(&imp->imp_refcount, 2);
1053 cfs_atomic_set(&imp->imp_unregistering, 0);
1054 cfs_atomic_set(&imp->imp_inflight, 0);
1055 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1056 cfs_atomic_set(&imp->imp_inval_count, 0);
1057 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1058 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1059 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1060 init_imp_at(&imp->imp_at);
1062 /* the default magic is V2, will be used in connect RPC, and
1063 * then adjusted according to the flags in request/reply. */
1064 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1068 EXPORT_SYMBOL(class_new_import);
1070 void class_destroy_import(struct obd_import *import)
1072 LASSERT(import != NULL);
1073 LASSERT(import != LP_POISON);
1075 class_handle_unhash(&import->imp_handle);
1077 spin_lock(&import->imp_lock);
1078 import->imp_generation++;
1079 spin_unlock(&import->imp_lock);
1080 class_import_put(import);
1082 EXPORT_SYMBOL(class_destroy_import);
1084 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1086 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1088 spin_lock(&exp->exp_locks_list_guard);
1090 LASSERT(lock->l_exp_refs_nr >= 0);
1092 if (lock->l_exp_refs_target != NULL &&
1093 lock->l_exp_refs_target != exp) {
1094 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1095 exp, lock, lock->l_exp_refs_target);
1097 if ((lock->l_exp_refs_nr ++) == 0) {
1098 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1099 lock->l_exp_refs_target = exp;
1101 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1102 lock, exp, lock->l_exp_refs_nr);
1103 spin_unlock(&exp->exp_locks_list_guard);
1105 EXPORT_SYMBOL(__class_export_add_lock_ref);
1107 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1109 spin_lock(&exp->exp_locks_list_guard);
1110 LASSERT(lock->l_exp_refs_nr > 0);
1111 if (lock->l_exp_refs_target != exp) {
1112 LCONSOLE_WARN("lock %p, "
1113 "mismatching export pointers: %p, %p\n",
1114 lock, lock->l_exp_refs_target, exp);
1116 if (-- lock->l_exp_refs_nr == 0) {
1117 cfs_list_del_init(&lock->l_exp_refs_link);
1118 lock->l_exp_refs_target = NULL;
1120 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1121 lock, exp, lock->l_exp_refs_nr);
1122 spin_unlock(&exp->exp_locks_list_guard);
1124 EXPORT_SYMBOL(__class_export_del_lock_ref);
1127 /* A connection defines an export context in which preallocation can
1128 be managed. This releases the export pointer reference, and returns
1129 the export handle, so the export refcount is 1 when this function
1131 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1132 struct obd_uuid *cluuid)
1134 struct obd_export *export;
1135 LASSERT(conn != NULL);
1136 LASSERT(obd != NULL);
1137 LASSERT(cluuid != NULL);
1140 export = class_new_export(obd, cluuid);
1142 RETURN(PTR_ERR(export));
1144 conn->cookie = export->exp_handle.h_cookie;
1145 class_export_put(export);
1147 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1148 cluuid->uuid, conn->cookie);
1151 EXPORT_SYMBOL(class_connect);
1153 /* if export is involved in recovery then clean up related things */
1154 void class_export_recovery_cleanup(struct obd_export *exp)
1156 struct obd_device *obd = exp->exp_obd;
1158 spin_lock(&obd->obd_recovery_task_lock);
1159 if (exp->exp_delayed)
1160 obd->obd_delayed_clients--;
1161 if (obd->obd_recovering) {
1162 if (exp->exp_in_recovery) {
1163 spin_lock(&exp->exp_lock);
1164 exp->exp_in_recovery = 0;
1165 spin_unlock(&exp->exp_lock);
1166 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1167 cfs_atomic_dec(&obd->obd_connected_clients);
1170 /* if called during recovery then should update
1171 * obd_stale_clients counter,
1172 * lightweight exports are not counted */
1173 if (exp->exp_failed &&
1174 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1175 exp->exp_obd->obd_stale_clients++;
1177 spin_unlock(&obd->obd_recovery_task_lock);
1178 /** Cleanup req replay fields */
1179 if (exp->exp_req_replay_needed) {
1180 spin_lock(&exp->exp_lock);
1181 exp->exp_req_replay_needed = 0;
1182 spin_unlock(&exp->exp_lock);
1183 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1184 cfs_atomic_dec(&obd->obd_req_replay_clients);
1186 /** Cleanup lock replay data */
1187 if (exp->exp_lock_replay_needed) {
1188 spin_lock(&exp->exp_lock);
1189 exp->exp_lock_replay_needed = 0;
1190 spin_unlock(&exp->exp_lock);
1191 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1192 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1196 /* This function removes 1-3 references from the export:
1197 * 1 - for export pointer passed
1198 * and if disconnect really need
1199 * 2 - removing from hash
1200 * 3 - in client_unlink_export
1201 * The export pointer passed to this function can destroyed */
1202 int class_disconnect(struct obd_export *export)
1204 int already_disconnected;
1207 if (export == NULL) {
1208 CWARN("attempting to free NULL export %p\n", export);
1212 spin_lock(&export->exp_lock);
1213 already_disconnected = export->exp_disconnected;
1214 export->exp_disconnected = 1;
1215 spin_unlock(&export->exp_lock);
1217 /* class_cleanup(), abort_recovery(), and class_fail_export()
1218 * all end up in here, and if any of them race we shouldn't
1219 * call extra class_export_puts(). */
1220 if (already_disconnected) {
1221 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1222 GOTO(no_disconn, already_disconnected);
1225 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1226 export->exp_handle.h_cookie);
1228 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1229 cfs_hash_del(export->exp_obd->obd_nid_hash,
1230 &export->exp_connection->c_peer.nid,
1231 &export->exp_nid_hash);
1233 class_export_recovery_cleanup(export);
1234 class_unlink_export(export);
1236 class_export_put(export);
1239 EXPORT_SYMBOL(class_disconnect);
1241 /* Return non-zero for a fully connected export */
1242 int class_connected_export(struct obd_export *exp)
1246 spin_lock(&exp->exp_lock);
1247 connected = (exp->exp_conn_cnt > 0);
1248 spin_unlock(&exp->exp_lock);
1253 EXPORT_SYMBOL(class_connected_export);
1255 static void class_disconnect_export_list(cfs_list_t *list,
1256 enum obd_option flags)
1259 struct obd_export *exp;
1262 /* It's possible that an export may disconnect itself, but
1263 * nothing else will be added to this list. */
1264 while (!cfs_list_empty(list)) {
1265 exp = cfs_list_entry(list->next, struct obd_export,
1267 /* need for safe call CDEBUG after obd_disconnect */
1268 class_export_get(exp);
1270 spin_lock(&exp->exp_lock);
1271 exp->exp_flags = flags;
1272 spin_unlock(&exp->exp_lock);
1274 if (obd_uuid_equals(&exp->exp_client_uuid,
1275 &exp->exp_obd->obd_uuid)) {
1277 "exp %p export uuid == obd uuid, don't discon\n",
1279 /* Need to delete this now so we don't end up pointing
1280 * to work_list later when this export is cleaned up. */
1281 cfs_list_del_init(&exp->exp_obd_chain);
1282 class_export_put(exp);
1286 class_export_get(exp);
1287 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1288 "last request at "CFS_TIME_T"\n",
1289 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1290 exp, exp->exp_last_request_time);
1291 /* release one export reference anyway */
1292 rc = obd_disconnect(exp);
1294 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1295 obd_export_nid2str(exp), exp, rc);
1296 class_export_put(exp);
1301 void class_disconnect_exports(struct obd_device *obd)
1303 cfs_list_t work_list;
1306 /* Move all of the exports from obd_exports to a work list, en masse. */
1307 CFS_INIT_LIST_HEAD(&work_list);
1308 spin_lock(&obd->obd_dev_lock);
1309 cfs_list_splice_init(&obd->obd_exports, &work_list);
1310 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1311 spin_unlock(&obd->obd_dev_lock);
1313 if (!cfs_list_empty(&work_list)) {
1314 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1315 "disconnecting them\n", obd->obd_minor, obd);
1316 class_disconnect_export_list(&work_list,
1317 exp_flags_from_obd(obd));
1319 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1320 obd->obd_minor, obd);
1323 EXPORT_SYMBOL(class_disconnect_exports);
1325 /* Remove exports that have not completed recovery.
1327 void class_disconnect_stale_exports(struct obd_device *obd,
1328 int (*test_export)(struct obd_export *))
1330 cfs_list_t work_list;
1331 struct obd_export *exp, *n;
1335 CFS_INIT_LIST_HEAD(&work_list);
1336 spin_lock(&obd->obd_dev_lock);
1337 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1339 /* don't count self-export as client */
1340 if (obd_uuid_equals(&exp->exp_client_uuid,
1341 &exp->exp_obd->obd_uuid))
1344 /* don't evict clients which have no slot in last_rcvd
1345 * (e.g. lightweight connection) */
1346 if (exp->exp_target_data.ted_lr_idx == -1)
1349 spin_lock(&exp->exp_lock);
1350 if (exp->exp_failed || test_export(exp)) {
1351 spin_unlock(&exp->exp_lock);
1354 exp->exp_failed = 1;
1355 spin_unlock(&exp->exp_lock);
1357 cfs_list_move(&exp->exp_obd_chain, &work_list);
1359 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1360 obd->obd_name, exp->exp_client_uuid.uuid,
1361 exp->exp_connection == NULL ? "<unknown>" :
1362 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1363 print_export_data(exp, "EVICTING", 0);
1365 spin_unlock(&obd->obd_dev_lock);
1368 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1369 obd->obd_name, evicted);
1371 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1372 OBD_OPT_ABORT_RECOV);
1375 EXPORT_SYMBOL(class_disconnect_stale_exports);
1377 void class_fail_export(struct obd_export *exp)
1379 int rc, already_failed;
1381 spin_lock(&exp->exp_lock);
1382 already_failed = exp->exp_failed;
1383 exp->exp_failed = 1;
1384 spin_unlock(&exp->exp_lock);
1386 if (already_failed) {
1387 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1388 exp, exp->exp_client_uuid.uuid);
1392 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1393 exp, exp->exp_client_uuid.uuid);
1395 if (obd_dump_on_timeout)
1396 libcfs_debug_dumplog();
1398 /* need for safe call CDEBUG after obd_disconnect */
1399 class_export_get(exp);
1401 /* Most callers into obd_disconnect are removing their own reference
1402 * (request, for example) in addition to the one from the hash table.
1403 * We don't have such a reference here, so make one. */
1404 class_export_get(exp);
1405 rc = obd_disconnect(exp);
1407 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1409 CDEBUG(D_HA, "disconnected export %p/%s\n",
1410 exp, exp->exp_client_uuid.uuid);
1411 class_export_put(exp);
1413 EXPORT_SYMBOL(class_fail_export);
1415 char *obd_export_nid2str(struct obd_export *exp)
1417 if (exp->exp_connection != NULL)
1418 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1422 EXPORT_SYMBOL(obd_export_nid2str);
1424 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1426 cfs_hash_t *nid_hash;
1427 struct obd_export *doomed_exp = NULL;
1428 int exports_evicted = 0;
1430 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1432 spin_lock(&obd->obd_dev_lock);
1433 /* umount has run already, so evict thread should leave
1434 * its task to umount thread now */
1435 if (obd->obd_stopping) {
1436 spin_unlock(&obd->obd_dev_lock);
1437 return exports_evicted;
1439 nid_hash = obd->obd_nid_hash;
1440 cfs_hash_getref(nid_hash);
1441 spin_unlock(&obd->obd_dev_lock);
1444 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1445 if (doomed_exp == NULL)
1448 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1449 "nid %s found, wanted nid %s, requested nid %s\n",
1450 obd_export_nid2str(doomed_exp),
1451 libcfs_nid2str(nid_key), nid);
1452 LASSERTF(doomed_exp != obd->obd_self_export,
1453 "self-export is hashed by NID?\n");
1455 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1456 "request\n", obd->obd_name,
1457 obd_uuid2str(&doomed_exp->exp_client_uuid),
1458 obd_export_nid2str(doomed_exp));
1459 class_fail_export(doomed_exp);
1460 class_export_put(doomed_exp);
1463 cfs_hash_putref(nid_hash);
1465 if (!exports_evicted)
1466 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1467 obd->obd_name, nid);
1468 return exports_evicted;
1470 EXPORT_SYMBOL(obd_export_evict_by_nid);
1472 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1474 cfs_hash_t *uuid_hash;
1475 struct obd_export *doomed_exp = NULL;
1476 struct obd_uuid doomed_uuid;
1477 int exports_evicted = 0;
1479 spin_lock(&obd->obd_dev_lock);
1480 if (obd->obd_stopping) {
1481 spin_unlock(&obd->obd_dev_lock);
1482 return exports_evicted;
1484 uuid_hash = obd->obd_uuid_hash;
1485 cfs_hash_getref(uuid_hash);
1486 spin_unlock(&obd->obd_dev_lock);
1488 obd_str2uuid(&doomed_uuid, uuid);
1489 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1490 CERROR("%s: can't evict myself\n", obd->obd_name);
1491 cfs_hash_putref(uuid_hash);
1492 return exports_evicted;
1495 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1497 if (doomed_exp == NULL) {
1498 CERROR("%s: can't disconnect %s: no exports found\n",
1499 obd->obd_name, uuid);
1501 CWARN("%s: evicting %s at adminstrative request\n",
1502 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1503 class_fail_export(doomed_exp);
1504 class_export_put(doomed_exp);
1507 cfs_hash_putref(uuid_hash);
1509 return exports_evicted;
1511 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1513 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1514 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1515 EXPORT_SYMBOL(class_export_dump_hook);
1518 static void print_export_data(struct obd_export *exp, const char *status,
1521 struct ptlrpc_reply_state *rs;
1522 struct ptlrpc_reply_state *first_reply = NULL;
1525 spin_lock(&exp->exp_lock);
1526 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1532 spin_unlock(&exp->exp_lock);
1534 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1535 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1536 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1537 cfs_atomic_read(&exp->exp_rpc_count),
1538 cfs_atomic_read(&exp->exp_cb_count),
1539 cfs_atomic_read(&exp->exp_locks_count),
1540 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1541 nreplies, first_reply, nreplies > 3 ? "..." : "",
1542 exp->exp_last_committed);
1543 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1544 if (locks && class_export_dump_hook != NULL)
1545 class_export_dump_hook(exp);
1549 void dump_exports(struct obd_device *obd, int locks)
1551 struct obd_export *exp;
1553 spin_lock(&obd->obd_dev_lock);
1554 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1555 print_export_data(exp, "ACTIVE", locks);
1556 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1557 print_export_data(exp, "UNLINKED", locks);
1558 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1559 print_export_data(exp, "DELAYED", locks);
1560 spin_unlock(&obd->obd_dev_lock);
1561 spin_lock(&obd_zombie_impexp_lock);
1562 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1563 print_export_data(exp, "ZOMBIE", locks);
1564 spin_unlock(&obd_zombie_impexp_lock);
1566 EXPORT_SYMBOL(dump_exports);
1568 void obd_exports_barrier(struct obd_device *obd)
1571 LASSERT(cfs_list_empty(&obd->obd_exports));
1572 spin_lock(&obd->obd_dev_lock);
1573 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1574 spin_unlock(&obd->obd_dev_lock);
1575 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1576 cfs_time_seconds(waited));
1577 if (waited > 5 && IS_PO2(waited)) {
1578 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1579 "more than %d seconds. "
1580 "The obd refcount = %d. Is it stuck?\n",
1581 obd->obd_name, waited,
1582 cfs_atomic_read(&obd->obd_refcount));
1583 dump_exports(obd, 1);
1586 spin_lock(&obd->obd_dev_lock);
1588 spin_unlock(&obd->obd_dev_lock);
1590 EXPORT_SYMBOL(obd_exports_barrier);
1592 /* Total amount of zombies to be destroyed */
1593 static int zombies_count = 0;
1596 * kill zombie imports and exports
1598 void obd_zombie_impexp_cull(void)
1600 struct obd_import *import;
1601 struct obd_export *export;
1605 spin_lock(&obd_zombie_impexp_lock);
1608 if (!cfs_list_empty(&obd_zombie_imports)) {
1609 import = cfs_list_entry(obd_zombie_imports.next,
1612 cfs_list_del_init(&import->imp_zombie_chain);
1616 if (!cfs_list_empty(&obd_zombie_exports)) {
1617 export = cfs_list_entry(obd_zombie_exports.next,
1620 cfs_list_del_init(&export->exp_obd_chain);
1623 spin_unlock(&obd_zombie_impexp_lock);
1625 if (import != NULL) {
1626 class_import_destroy(import);
1627 spin_lock(&obd_zombie_impexp_lock);
1629 spin_unlock(&obd_zombie_impexp_lock);
1632 if (export != NULL) {
1633 class_export_destroy(export);
1634 spin_lock(&obd_zombie_impexp_lock);
1636 spin_unlock(&obd_zombie_impexp_lock);
1640 } while (import != NULL || export != NULL);
1644 static struct completion obd_zombie_start;
1645 static struct completion obd_zombie_stop;
1646 static unsigned long obd_zombie_flags;
1647 static cfs_waitq_t obd_zombie_waitq;
1648 static pid_t obd_zombie_pid;
1651 OBD_ZOMBIE_STOP = 0x0001,
1655 * check for work for kill zombie import/export thread.
1657 static int obd_zombie_impexp_check(void *arg)
1661 spin_lock(&obd_zombie_impexp_lock);
1662 rc = (zombies_count == 0) &&
1663 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1664 spin_unlock(&obd_zombie_impexp_lock);
1670 * Add export to the obd_zombe thread and notify it.
1672 static void obd_zombie_export_add(struct obd_export *exp) {
1673 spin_lock(&exp->exp_obd->obd_dev_lock);
1674 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1675 cfs_list_del_init(&exp->exp_obd_chain);
1676 spin_unlock(&exp->exp_obd->obd_dev_lock);
1677 spin_lock(&obd_zombie_impexp_lock);
1679 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1680 spin_unlock(&obd_zombie_impexp_lock);
1682 obd_zombie_impexp_notify();
1686 * Add import to the obd_zombe thread and notify it.
1688 static void obd_zombie_import_add(struct obd_import *imp) {
1689 LASSERT(imp->imp_sec == NULL);
1690 LASSERT(imp->imp_rq_pool == NULL);
1691 spin_lock(&obd_zombie_impexp_lock);
1692 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1694 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1695 spin_unlock(&obd_zombie_impexp_lock);
1697 obd_zombie_impexp_notify();
1701 * notify import/export destroy thread about new zombie.
1703 static void obd_zombie_impexp_notify(void)
1706 * Make sure obd_zomebie_impexp_thread get this notification.
1707 * It is possible this signal only get by obd_zombie_barrier, and
1708 * barrier gulps this notification and sleeps away and hangs ensues
1710 cfs_waitq_broadcast(&obd_zombie_waitq);
1714 * check whether obd_zombie is idle
1716 static int obd_zombie_is_idle(void)
1720 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1721 spin_lock(&obd_zombie_impexp_lock);
1722 rc = (zombies_count == 0);
1723 spin_unlock(&obd_zombie_impexp_lock);
1728 * wait when obd_zombie import/export queues become empty
1730 void obd_zombie_barrier(void)
1732 struct l_wait_info lwi = { 0 };
1734 if (obd_zombie_pid == cfs_curproc_pid())
1735 /* don't wait for myself */
1737 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1739 EXPORT_SYMBOL(obd_zombie_barrier);
1744 * destroy zombie export/import thread.
1746 static int obd_zombie_impexp_thread(void *unused)
1750 rc = cfs_daemonize_ctxt("obd_zombid");
1752 complete(&obd_zombie_start);
1756 complete(&obd_zombie_start);
1758 obd_zombie_pid = cfs_curproc_pid();
1760 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1761 struct l_wait_info lwi = { 0 };
1763 l_wait_event(obd_zombie_waitq,
1764 !obd_zombie_impexp_check(NULL), &lwi);
1765 obd_zombie_impexp_cull();
1768 * Notify obd_zombie_barrier callers that queues
1771 cfs_waitq_signal(&obd_zombie_waitq);
1774 complete(&obd_zombie_stop);
1779 #else /* ! KERNEL */
1781 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1782 static void *obd_zombie_impexp_work_cb;
1783 static void *obd_zombie_impexp_idle_cb;
1785 int obd_zombie_impexp_kill(void *arg)
1789 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1790 obd_zombie_impexp_cull();
1793 cfs_atomic_dec(&zombie_recur);
1800 * start destroy zombie import/export thread
1802 int obd_zombie_impexp_init(void)
1806 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1807 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1808 spin_lock_init(&obd_zombie_impexp_lock);
1809 init_completion(&obd_zombie_start);
1810 init_completion(&obd_zombie_stop);
1811 cfs_waitq_init(&obd_zombie_waitq);
1815 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1819 wait_for_completion(&obd_zombie_start);
1822 obd_zombie_impexp_work_cb =
1823 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1824 &obd_zombie_impexp_kill, NULL);
1826 obd_zombie_impexp_idle_cb =
1827 liblustre_register_idle_callback("obd_zombi_impexp_check",
1828 &obd_zombie_impexp_check, NULL);
1834 * stop destroy zombie import/export thread
1836 void obd_zombie_impexp_stop(void)
1838 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1839 obd_zombie_impexp_notify();
1841 wait_for_completion(&obd_zombie_stop);
1843 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1844 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1848 /***** Kernel-userspace comm helpers *******/
1850 /* Get length of entire message, including header */
1851 int kuc_len(int payload_len)
1853 return sizeof(struct kuc_hdr) + payload_len;
1855 EXPORT_SYMBOL(kuc_len);
1857 /* Get a pointer to kuc header, given a ptr to the payload
1858 * @param p Pointer to payload area
1859 * @returns Pointer to kuc header
1861 struct kuc_hdr * kuc_ptr(void *p)
1863 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1864 LASSERT(lh->kuc_magic == KUC_MAGIC);
1867 EXPORT_SYMBOL(kuc_ptr);
1869 /* Test if payload is part of kuc message
1870 * @param p Pointer to payload area
1873 int kuc_ispayload(void *p)
1875 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1877 if (kh->kuc_magic == KUC_MAGIC)
1882 EXPORT_SYMBOL(kuc_ispayload);
1884 /* Alloc space for a message, and fill in header
1885 * @return Pointer to payload area
1887 void *kuc_alloc(int payload_len, int transport, int type)
1890 int len = kuc_len(payload_len);
1894 return ERR_PTR(-ENOMEM);
1896 lh->kuc_magic = KUC_MAGIC;
1897 lh->kuc_transport = transport;
1898 lh->kuc_msgtype = type;
1899 lh->kuc_msglen = len;
1901 return (void *)(lh + 1);
1903 EXPORT_SYMBOL(kuc_alloc);
1905 /* Takes pointer to payload area */
1906 inline void kuc_free(void *p, int payload_len)
1908 struct kuc_hdr *lh = kuc_ptr(p);
1909 OBD_FREE(lh, kuc_len(payload_len));
1911 EXPORT_SYMBOL(kuc_free);