4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
129 modname = LUSTRE_MDT_NAME;
131 if (!cfs_request_module("%s", modname)) {
132 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
133 type = class_search_type(name);
135 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
141 spin_lock(&type->obd_type_lock);
143 cfs_try_module_get(type->typ_dt_ops->o_owner);
144 spin_unlock(&type->obd_type_lock);
148 EXPORT_SYMBOL(class_get_type);
150 void class_put_type(struct obd_type *type)
153 spin_lock(&type->obd_type_lock);
155 cfs_module_put(type->typ_dt_ops->o_owner);
156 spin_unlock(&type->obd_type_lock);
158 EXPORT_SYMBOL(class_put_type);
160 #define CLASS_MAX_NAME 1024
162 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
163 struct lprocfs_vars *vars, const char *name,
164 struct lu_device_type *ldt)
166 struct obd_type *type;
171 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
173 if (class_search_type(name)) {
174 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
179 OBD_ALLOC(type, sizeof(*type));
183 OBD_ALLOC_PTR(type->typ_dt_ops);
184 OBD_ALLOC_PTR(type->typ_md_ops);
185 OBD_ALLOC(type->typ_name, strlen(name) + 1);
187 if (type->typ_dt_ops == NULL ||
188 type->typ_md_ops == NULL ||
189 type->typ_name == NULL)
192 *(type->typ_dt_ops) = *dt_ops;
193 /* md_ops is optional */
195 *(type->typ_md_ops) = *md_ops;
196 strcpy(type->typ_name, name);
197 spin_lock_init(&type->obd_type_lock);
200 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
202 if (IS_ERR(type->typ_procroot)) {
203 rc = PTR_ERR(type->typ_procroot);
204 type->typ_procroot = NULL;
210 rc = lu_device_type_init(ldt);
215 spin_lock(&obd_types_lock);
216 cfs_list_add(&type->typ_chain, &obd_types);
217 spin_unlock(&obd_types_lock);
222 if (type->typ_name != NULL)
223 OBD_FREE(type->typ_name, strlen(name) + 1);
224 if (type->typ_md_ops != NULL)
225 OBD_FREE_PTR(type->typ_md_ops);
226 if (type->typ_dt_ops != NULL)
227 OBD_FREE_PTR(type->typ_dt_ops);
228 OBD_FREE(type, sizeof(*type));
231 EXPORT_SYMBOL(class_register_type);
233 int class_unregister_type(const char *name)
235 struct obd_type *type = class_search_type(name);
239 CERROR("unknown obd type\n");
243 if (type->typ_refcnt) {
244 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
245 /* This is a bad situation, let's make the best of it */
246 /* Remove ops, but leave the name for debugging */
247 OBD_FREE_PTR(type->typ_dt_ops);
248 OBD_FREE_PTR(type->typ_md_ops);
252 /* we do not use type->typ_procroot as for compatibility purposes
253 * other modules can share names (i.e. lod can use lov entry). so
254 * we can't reference pointer as it can get invalided when another
255 * module removes the entry */
256 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
259 lu_device_type_fini(type->typ_lu);
261 spin_lock(&obd_types_lock);
262 cfs_list_del(&type->typ_chain);
263 spin_unlock(&obd_types_lock);
264 OBD_FREE(type->typ_name, strlen(name) + 1);
265 if (type->typ_dt_ops != NULL)
266 OBD_FREE_PTR(type->typ_dt_ops);
267 if (type->typ_md_ops != NULL)
268 OBD_FREE_PTR(type->typ_md_ops);
269 OBD_FREE(type, sizeof(*type));
271 } /* class_unregister_type */
272 EXPORT_SYMBOL(class_unregister_type);
275 * Create a new obd device.
277 * Find an empty slot in ::obd_devs[], create a new obd device in it.
279 * \param[in] type_name obd device type string.
280 * \param[in] name obd device name.
282 * \retval NULL if create fails, otherwise return the obd device
285 struct obd_device *class_newdev(const char *type_name, const char *name)
287 struct obd_device *result = NULL;
288 struct obd_device *newdev;
289 struct obd_type *type = NULL;
291 int new_obd_minor = 0;
294 if (strlen(name) >= MAX_OBD_NAME) {
295 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
296 RETURN(ERR_PTR(-EINVAL));
299 type = class_get_type(type_name);
301 CERROR("OBD: unknown type: %s\n", type_name);
302 RETURN(ERR_PTR(-ENODEV));
305 newdev = obd_device_alloc();
307 GOTO(out_type, result = ERR_PTR(-ENOMEM));
309 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
311 write_lock(&obd_dev_lock);
312 for (i = 0; i < class_devno_max(); i++) {
313 struct obd_device *obd = class_num2obd(i);
315 if (obd && obd->obd_name &&
316 (strcmp(name, obd->obd_name) == 0)) {
317 CERROR("Device %s already exists at %d, won't add\n",
320 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
321 "%p obd_magic %08x != %08x\n", result,
322 result->obd_magic, OBD_DEVICE_MAGIC);
323 LASSERTF(result->obd_minor == new_obd_minor,
324 "%p obd_minor %d != %d\n", result,
325 result->obd_minor, new_obd_minor);
327 obd_devs[result->obd_minor] = NULL;
328 result->obd_name[0]='\0';
330 result = ERR_PTR(-EEXIST);
333 if (!result && !obd) {
335 result->obd_minor = i;
337 result->obd_type = type;
338 strncpy(result->obd_name, name,
339 sizeof(result->obd_name) - 1);
340 obd_devs[i] = result;
343 write_unlock(&obd_dev_lock);
345 if (result == NULL && i >= class_devno_max()) {
346 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
348 GOTO(out, result = ERR_PTR(-EOVERFLOW));
354 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
355 result->obd_name, result);
359 obd_device_free(newdev);
361 class_put_type(type);
365 void class_release_dev(struct obd_device *obd)
367 struct obd_type *obd_type = obd->obd_type;
369 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
370 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
371 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
372 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
373 LASSERT(obd_type != NULL);
375 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
376 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
378 write_lock(&obd_dev_lock);
379 obd_devs[obd->obd_minor] = NULL;
380 write_unlock(&obd_dev_lock);
381 obd_device_free(obd);
383 class_put_type(obd_type);
386 int class_name2dev(const char *name)
393 read_lock(&obd_dev_lock);
394 for (i = 0; i < class_devno_max(); i++) {
395 struct obd_device *obd = class_num2obd(i);
397 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
398 /* Make sure we finished attaching before we give
399 out any references */
400 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
401 if (obd->obd_attached) {
402 read_unlock(&obd_dev_lock);
408 read_unlock(&obd_dev_lock);
412 EXPORT_SYMBOL(class_name2dev);
414 struct obd_device *class_name2obd(const char *name)
416 int dev = class_name2dev(name);
418 if (dev < 0 || dev > class_devno_max())
420 return class_num2obd(dev);
422 EXPORT_SYMBOL(class_name2obd);
424 int class_uuid2dev(struct obd_uuid *uuid)
428 read_lock(&obd_dev_lock);
429 for (i = 0; i < class_devno_max(); i++) {
430 struct obd_device *obd = class_num2obd(i);
432 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
433 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
434 read_unlock(&obd_dev_lock);
438 read_unlock(&obd_dev_lock);
442 EXPORT_SYMBOL(class_uuid2dev);
444 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
446 int dev = class_uuid2dev(uuid);
449 return class_num2obd(dev);
451 EXPORT_SYMBOL(class_uuid2obd);
454 * Get obd device from ::obd_devs[]
456 * \param num [in] array index
458 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
459 * otherwise return the obd device there.
461 struct obd_device *class_num2obd(int num)
463 struct obd_device *obd = NULL;
465 if (num < class_devno_max()) {
470 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
471 "%p obd_magic %08x != %08x\n",
472 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
473 LASSERTF(obd->obd_minor == num,
474 "%p obd_minor %0d != %0d\n",
475 obd, obd->obd_minor, num);
480 EXPORT_SYMBOL(class_num2obd);
483 * Get obd devices count. Device in any
485 * \retval obd device count
487 int get_devices_count(void)
489 int index, max_index = class_devno_max(), dev_count = 0;
491 read_lock(&obd_dev_lock);
492 for (index = 0; index <= max_index; index++) {
493 struct obd_device *obd = class_num2obd(index);
497 read_unlock(&obd_dev_lock);
501 EXPORT_SYMBOL(get_devices_count);
503 void class_obd_list(void)
508 read_lock(&obd_dev_lock);
509 for (i = 0; i < class_devno_max(); i++) {
510 struct obd_device *obd = class_num2obd(i);
514 if (obd->obd_stopping)
516 else if (obd->obd_set_up)
518 else if (obd->obd_attached)
522 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
523 i, status, obd->obd_type->typ_name,
524 obd->obd_name, obd->obd_uuid.uuid,
525 cfs_atomic_read(&obd->obd_refcount));
527 read_unlock(&obd_dev_lock);
531 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
532 specified, then only the client with that uuid is returned,
533 otherwise any client connected to the tgt is returned. */
534 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
535 const char * typ_name,
536 struct obd_uuid *grp_uuid)
540 read_lock(&obd_dev_lock);
541 for (i = 0; i < class_devno_max(); i++) {
542 struct obd_device *obd = class_num2obd(i);
546 if ((strncmp(obd->obd_type->typ_name, typ_name,
547 strlen(typ_name)) == 0)) {
548 if (obd_uuid_equals(tgt_uuid,
549 &obd->u.cli.cl_target_uuid) &&
550 ((grp_uuid)? obd_uuid_equals(grp_uuid,
551 &obd->obd_uuid) : 1)) {
552 read_unlock(&obd_dev_lock);
557 read_unlock(&obd_dev_lock);
561 EXPORT_SYMBOL(class_find_client_obd);
563 /* Iterate the obd_device list looking devices have grp_uuid. Start
564 searching at *next, and if a device is found, the next index to look
565 at is saved in *next. If next is NULL, then the first matching device
566 will always be returned. */
567 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
573 else if (*next >= 0 && *next < class_devno_max())
578 read_lock(&obd_dev_lock);
579 for (; i < class_devno_max(); i++) {
580 struct obd_device *obd = class_num2obd(i);
584 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
587 read_unlock(&obd_dev_lock);
591 read_unlock(&obd_dev_lock);
595 EXPORT_SYMBOL(class_devices_in_group);
598 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
599 * adjust sptlrpc settings accordingly.
601 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
603 struct obd_device *obd;
607 LASSERT(namelen > 0);
609 read_lock(&obd_dev_lock);
610 for (i = 0; i < class_devno_max(); i++) {
611 obd = class_num2obd(i);
613 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
616 /* only notify mdc, osc, mdt, ost */
617 type = obd->obd_type->typ_name;
618 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
619 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
620 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
621 strcmp(type, LUSTRE_OST_NAME) != 0)
624 if (strncmp(obd->obd_name, fsname, namelen))
627 class_incref(obd, __FUNCTION__, obd);
628 read_unlock(&obd_dev_lock);
629 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
630 sizeof(KEY_SPTLRPC_CONF),
631 KEY_SPTLRPC_CONF, 0, NULL, NULL);
633 class_decref(obd, __FUNCTION__, obd);
634 read_lock(&obd_dev_lock);
636 read_unlock(&obd_dev_lock);
639 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
641 void obd_cleanup_caches(void)
646 if (obd_device_cachep) {
647 rc = cfs_mem_cache_destroy(obd_device_cachep);
648 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
649 obd_device_cachep = NULL;
652 rc = cfs_mem_cache_destroy(obdo_cachep);
653 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
657 rc = cfs_mem_cache_destroy(import_cachep);
658 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
659 import_cachep = NULL;
662 rc = cfs_mem_cache_destroy(capa_cachep);
663 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
669 int obd_init_caches(void)
673 LASSERT(obd_device_cachep == NULL);
674 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
675 sizeof(struct obd_device),
677 if (!obd_device_cachep)
680 LASSERT(obdo_cachep == NULL);
681 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
686 LASSERT(import_cachep == NULL);
687 import_cachep = cfs_mem_cache_create("ll_import_cache",
688 sizeof(struct obd_import),
693 LASSERT(capa_cachep == NULL);
694 capa_cachep = cfs_mem_cache_create("capa_cache",
695 sizeof(struct obd_capa), 0, 0);
701 obd_cleanup_caches();
706 /* map connection to client */
707 struct obd_export *class_conn2export(struct lustre_handle *conn)
709 struct obd_export *export;
713 CDEBUG(D_CACHE, "looking for null handle\n");
717 if (conn->cookie == -1) { /* this means assign a new connection */
718 CDEBUG(D_CACHE, "want a new connection\n");
722 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
723 export = class_handle2object(conn->cookie);
726 EXPORT_SYMBOL(class_conn2export);
728 struct obd_device *class_exp2obd(struct obd_export *exp)
734 EXPORT_SYMBOL(class_exp2obd);
736 struct obd_device *class_conn2obd(struct lustre_handle *conn)
738 struct obd_export *export;
739 export = class_conn2export(conn);
741 struct obd_device *obd = export->exp_obd;
742 class_export_put(export);
747 EXPORT_SYMBOL(class_conn2obd);
749 struct obd_import *class_exp2cliimp(struct obd_export *exp)
751 struct obd_device *obd = exp->exp_obd;
754 return obd->u.cli.cl_import;
756 EXPORT_SYMBOL(class_exp2cliimp);
758 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
760 struct obd_device *obd = class_conn2obd(conn);
763 return obd->u.cli.cl_import;
765 EXPORT_SYMBOL(class_conn2cliimp);
767 /* Export management functions */
768 static void class_export_destroy(struct obd_export *exp)
770 struct obd_device *obd = exp->exp_obd;
773 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
774 LASSERT(obd != NULL);
776 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
777 exp->exp_client_uuid.uuid, obd->obd_name);
779 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
780 if (exp->exp_connection)
781 ptlrpc_put_connection_superhack(exp->exp_connection);
783 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
784 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
785 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
786 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
787 obd_destroy_export(exp);
788 class_decref(obd, "export", exp);
790 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
794 static void export_handle_addref(void *export)
796 class_export_get(export);
799 static struct portals_handle_ops export_handle_ops = {
800 .hop_addref = export_handle_addref,
804 struct obd_export *class_export_get(struct obd_export *exp)
806 cfs_atomic_inc(&exp->exp_refcount);
807 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
808 cfs_atomic_read(&exp->exp_refcount));
811 EXPORT_SYMBOL(class_export_get);
813 void class_export_put(struct obd_export *exp)
815 LASSERT(exp != NULL);
816 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
817 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
818 cfs_atomic_read(&exp->exp_refcount) - 1);
820 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
821 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
822 CDEBUG(D_IOCTL, "final put %p/%s\n",
823 exp, exp->exp_client_uuid.uuid);
825 /* release nid stat refererence */
826 lprocfs_exp_cleanup(exp);
828 obd_zombie_export_add(exp);
831 EXPORT_SYMBOL(class_export_put);
833 /* Creates a new export, adds it to the hash table, and returns a
834 * pointer to it. The refcount is 2: one for the hash reference, and
835 * one for the pointer returned by this function. */
836 struct obd_export *class_new_export(struct obd_device *obd,
837 struct obd_uuid *cluuid)
839 struct obd_export *export;
840 cfs_hash_t *hash = NULL;
844 OBD_ALLOC_PTR(export);
846 return ERR_PTR(-ENOMEM);
848 export->exp_conn_cnt = 0;
849 export->exp_lock_hash = NULL;
850 export->exp_flock_hash = NULL;
851 cfs_atomic_set(&export->exp_refcount, 2);
852 cfs_atomic_set(&export->exp_rpc_count, 0);
853 cfs_atomic_set(&export->exp_cb_count, 0);
854 cfs_atomic_set(&export->exp_locks_count, 0);
855 #if LUSTRE_TRACKS_LOCK_EXP_REFS
856 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
857 spin_lock_init(&export->exp_locks_list_guard);
859 cfs_atomic_set(&export->exp_replay_count, 0);
860 export->exp_obd = obd;
861 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
862 spin_lock_init(&export->exp_uncommitted_replies_lock);
863 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
864 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
865 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
866 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
867 class_handle_hash(&export->exp_handle, &export_handle_ops);
868 export->exp_last_request_time = cfs_time_current_sec();
869 spin_lock_init(&export->exp_lock);
870 spin_lock_init(&export->exp_rpc_lock);
871 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
872 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
873 spin_lock_init(&export->exp_bl_list_lock);
874 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
876 export->exp_sp_peer = LUSTRE_SP_ANY;
877 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
878 export->exp_client_uuid = *cluuid;
879 obd_init_export(export);
881 spin_lock(&obd->obd_dev_lock);
882 /* shouldn't happen, but might race */
883 if (obd->obd_stopping)
884 GOTO(exit_unlock, rc = -ENODEV);
886 hash = cfs_hash_getref(obd->obd_uuid_hash);
888 GOTO(exit_unlock, rc = -ENODEV);
889 spin_unlock(&obd->obd_dev_lock);
891 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
892 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
894 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
895 obd->obd_name, cluuid->uuid, rc);
896 GOTO(exit_err, rc = -EALREADY);
900 spin_lock(&obd->obd_dev_lock);
901 if (obd->obd_stopping) {
902 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
903 GOTO(exit_unlock, rc = -ENODEV);
906 class_incref(obd, "export", export);
907 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
908 cfs_list_add_tail(&export->exp_obd_chain_timed,
909 &export->exp_obd->obd_exports_timed);
910 export->exp_obd->obd_num_exports++;
911 spin_unlock(&obd->obd_dev_lock);
912 cfs_hash_putref(hash);
916 spin_unlock(&obd->obd_dev_lock);
919 cfs_hash_putref(hash);
920 class_handle_unhash(&export->exp_handle);
921 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
922 obd_destroy_export(export);
923 OBD_FREE_PTR(export);
926 EXPORT_SYMBOL(class_new_export);
928 void class_unlink_export(struct obd_export *exp)
930 class_handle_unhash(&exp->exp_handle);
932 spin_lock(&exp->exp_obd->obd_dev_lock);
933 /* delete an uuid-export hashitem from hashtables */
934 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
935 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
936 &exp->exp_client_uuid,
937 &exp->exp_uuid_hash);
939 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
940 cfs_list_del_init(&exp->exp_obd_chain_timed);
941 exp->exp_obd->obd_num_exports--;
942 spin_unlock(&exp->exp_obd->obd_dev_lock);
943 class_export_put(exp);
945 EXPORT_SYMBOL(class_unlink_export);
947 /* Import management functions */
948 void class_import_destroy(struct obd_import *imp)
952 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
953 imp->imp_obd->obd_name);
955 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
957 ptlrpc_put_connection_superhack(imp->imp_connection);
959 while (!cfs_list_empty(&imp->imp_conn_list)) {
960 struct obd_import_conn *imp_conn;
962 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
963 struct obd_import_conn, oic_item);
964 cfs_list_del_init(&imp_conn->oic_item);
965 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
966 OBD_FREE(imp_conn, sizeof(*imp_conn));
969 LASSERT(imp->imp_sec == NULL);
970 class_decref(imp->imp_obd, "import", imp);
971 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
975 static void import_handle_addref(void *import)
977 class_import_get(import);
980 static struct portals_handle_ops import_handle_ops = {
981 .hop_addref = import_handle_addref,
985 struct obd_import *class_import_get(struct obd_import *import)
987 cfs_atomic_inc(&import->imp_refcount);
988 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
989 cfs_atomic_read(&import->imp_refcount),
990 import->imp_obd->obd_name);
993 EXPORT_SYMBOL(class_import_get);
995 void class_import_put(struct obd_import *imp)
999 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1000 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1002 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1003 cfs_atomic_read(&imp->imp_refcount) - 1,
1004 imp->imp_obd->obd_name);
1006 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1007 CDEBUG(D_INFO, "final put import %p\n", imp);
1008 obd_zombie_import_add(imp);
1011 /* catch possible import put race */
1012 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1015 EXPORT_SYMBOL(class_import_put);
1017 static void init_imp_at(struct imp_at *at) {
1019 at_init(&at->iat_net_latency, 0, 0);
1020 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1021 /* max service estimates are tracked on the server side, so
1022 don't use the AT history here, just use the last reported
1023 val. (But keep hist for proc histogram, worst_ever) */
1024 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1029 struct obd_import *class_new_import(struct obd_device *obd)
1031 struct obd_import *imp;
1033 OBD_ALLOC(imp, sizeof(*imp));
1037 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1038 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1039 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1040 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1041 spin_lock_init(&imp->imp_lock);
1042 imp->imp_last_success_conn = 0;
1043 imp->imp_state = LUSTRE_IMP_NEW;
1044 imp->imp_obd = class_incref(obd, "import", imp);
1045 mutex_init(&imp->imp_sec_mutex);
1046 cfs_waitq_init(&imp->imp_recovery_waitq);
1048 cfs_atomic_set(&imp->imp_refcount, 2);
1049 cfs_atomic_set(&imp->imp_unregistering, 0);
1050 cfs_atomic_set(&imp->imp_inflight, 0);
1051 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1052 cfs_atomic_set(&imp->imp_inval_count, 0);
1053 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1054 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1055 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1056 init_imp_at(&imp->imp_at);
1058 /* the default magic is V2, will be used in connect RPC, and
1059 * then adjusted according to the flags in request/reply. */
1060 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1064 EXPORT_SYMBOL(class_new_import);
1066 void class_destroy_import(struct obd_import *import)
1068 LASSERT(import != NULL);
1069 LASSERT(import != LP_POISON);
1071 class_handle_unhash(&import->imp_handle);
1073 spin_lock(&import->imp_lock);
1074 import->imp_generation++;
1075 spin_unlock(&import->imp_lock);
1076 class_import_put(import);
1078 EXPORT_SYMBOL(class_destroy_import);
1080 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1082 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1084 spin_lock(&exp->exp_locks_list_guard);
1086 LASSERT(lock->l_exp_refs_nr >= 0);
1088 if (lock->l_exp_refs_target != NULL &&
1089 lock->l_exp_refs_target != exp) {
1090 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1091 exp, lock, lock->l_exp_refs_target);
1093 if ((lock->l_exp_refs_nr ++) == 0) {
1094 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1095 lock->l_exp_refs_target = exp;
1097 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1098 lock, exp, lock->l_exp_refs_nr);
1099 spin_unlock(&exp->exp_locks_list_guard);
1101 EXPORT_SYMBOL(__class_export_add_lock_ref);
1103 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1105 spin_lock(&exp->exp_locks_list_guard);
1106 LASSERT(lock->l_exp_refs_nr > 0);
1107 if (lock->l_exp_refs_target != exp) {
1108 LCONSOLE_WARN("lock %p, "
1109 "mismatching export pointers: %p, %p\n",
1110 lock, lock->l_exp_refs_target, exp);
1112 if (-- lock->l_exp_refs_nr == 0) {
1113 cfs_list_del_init(&lock->l_exp_refs_link);
1114 lock->l_exp_refs_target = NULL;
1116 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1117 lock, exp, lock->l_exp_refs_nr);
1118 spin_unlock(&exp->exp_locks_list_guard);
1120 EXPORT_SYMBOL(__class_export_del_lock_ref);
1123 /* A connection defines an export context in which preallocation can
1124 be managed. This releases the export pointer reference, and returns
1125 the export handle, so the export refcount is 1 when this function
1127 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1128 struct obd_uuid *cluuid)
1130 struct obd_export *export;
1131 LASSERT(conn != NULL);
1132 LASSERT(obd != NULL);
1133 LASSERT(cluuid != NULL);
1136 export = class_new_export(obd, cluuid);
1138 RETURN(PTR_ERR(export));
1140 conn->cookie = export->exp_handle.h_cookie;
1141 class_export_put(export);
1143 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1144 cluuid->uuid, conn->cookie);
1147 EXPORT_SYMBOL(class_connect);
1149 /* if export is involved in recovery then clean up related things */
1150 void class_export_recovery_cleanup(struct obd_export *exp)
1152 struct obd_device *obd = exp->exp_obd;
1154 spin_lock(&obd->obd_recovery_task_lock);
1155 if (exp->exp_delayed)
1156 obd->obd_delayed_clients--;
1157 if (obd->obd_recovering) {
1158 if (exp->exp_in_recovery) {
1159 spin_lock(&exp->exp_lock);
1160 exp->exp_in_recovery = 0;
1161 spin_unlock(&exp->exp_lock);
1162 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1163 cfs_atomic_dec(&obd->obd_connected_clients);
1166 /* if called during recovery then should update
1167 * obd_stale_clients counter,
1168 * lightweight exports are not counted */
1169 if (exp->exp_failed &&
1170 (exp->exp_connect_flags & OBD_CONNECT_LIGHTWEIGHT) == 0)
1171 exp->exp_obd->obd_stale_clients++;
1173 spin_unlock(&obd->obd_recovery_task_lock);
1174 /** Cleanup req replay fields */
1175 if (exp->exp_req_replay_needed) {
1176 spin_lock(&exp->exp_lock);
1177 exp->exp_req_replay_needed = 0;
1178 spin_unlock(&exp->exp_lock);
1179 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1180 cfs_atomic_dec(&obd->obd_req_replay_clients);
1182 /** Cleanup lock replay data */
1183 if (exp->exp_lock_replay_needed) {
1184 spin_lock(&exp->exp_lock);
1185 exp->exp_lock_replay_needed = 0;
1186 spin_unlock(&exp->exp_lock);
1187 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1188 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1192 /* This function removes 1-3 references from the export:
1193 * 1 - for export pointer passed
1194 * and if disconnect really need
1195 * 2 - removing from hash
1196 * 3 - in client_unlink_export
1197 * The export pointer passed to this function can destroyed */
1198 int class_disconnect(struct obd_export *export)
1200 int already_disconnected;
1203 if (export == NULL) {
1204 CWARN("attempting to free NULL export %p\n", export);
1208 spin_lock(&export->exp_lock);
1209 already_disconnected = export->exp_disconnected;
1210 export->exp_disconnected = 1;
1211 spin_unlock(&export->exp_lock);
1213 /* class_cleanup(), abort_recovery(), and class_fail_export()
1214 * all end up in here, and if any of them race we shouldn't
1215 * call extra class_export_puts(). */
1216 if (already_disconnected) {
1217 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1218 GOTO(no_disconn, already_disconnected);
1221 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1222 export->exp_handle.h_cookie);
1224 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1225 cfs_hash_del(export->exp_obd->obd_nid_hash,
1226 &export->exp_connection->c_peer.nid,
1227 &export->exp_nid_hash);
1229 class_export_recovery_cleanup(export);
1230 class_unlink_export(export);
1232 class_export_put(export);
1235 EXPORT_SYMBOL(class_disconnect);
1237 /* Return non-zero for a fully connected export */
1238 int class_connected_export(struct obd_export *exp)
1242 spin_lock(&exp->exp_lock);
1243 connected = (exp->exp_conn_cnt > 0);
1244 spin_unlock(&exp->exp_lock);
1249 EXPORT_SYMBOL(class_connected_export);
1251 static void class_disconnect_export_list(cfs_list_t *list,
1252 enum obd_option flags)
1255 struct obd_export *exp;
1258 /* It's possible that an export may disconnect itself, but
1259 * nothing else will be added to this list. */
1260 while (!cfs_list_empty(list)) {
1261 exp = cfs_list_entry(list->next, struct obd_export,
1263 /* need for safe call CDEBUG after obd_disconnect */
1264 class_export_get(exp);
1266 spin_lock(&exp->exp_lock);
1267 exp->exp_flags = flags;
1268 spin_unlock(&exp->exp_lock);
1270 if (obd_uuid_equals(&exp->exp_client_uuid,
1271 &exp->exp_obd->obd_uuid)) {
1273 "exp %p export uuid == obd uuid, don't discon\n",
1275 /* Need to delete this now so we don't end up pointing
1276 * to work_list later when this export is cleaned up. */
1277 cfs_list_del_init(&exp->exp_obd_chain);
1278 class_export_put(exp);
1282 class_export_get(exp);
1283 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1284 "last request at "CFS_TIME_T"\n",
1285 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1286 exp, exp->exp_last_request_time);
1287 /* release one export reference anyway */
1288 rc = obd_disconnect(exp);
1290 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1291 obd_export_nid2str(exp), exp, rc);
1292 class_export_put(exp);
1297 void class_disconnect_exports(struct obd_device *obd)
1299 cfs_list_t work_list;
1302 /* Move all of the exports from obd_exports to a work list, en masse. */
1303 CFS_INIT_LIST_HEAD(&work_list);
1304 spin_lock(&obd->obd_dev_lock);
1305 cfs_list_splice_init(&obd->obd_exports, &work_list);
1306 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1307 spin_unlock(&obd->obd_dev_lock);
1309 if (!cfs_list_empty(&work_list)) {
1310 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1311 "disconnecting them\n", obd->obd_minor, obd);
1312 class_disconnect_export_list(&work_list,
1313 exp_flags_from_obd(obd));
1315 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1316 obd->obd_minor, obd);
1319 EXPORT_SYMBOL(class_disconnect_exports);
1321 /* Remove exports that have not completed recovery.
1323 void class_disconnect_stale_exports(struct obd_device *obd,
1324 int (*test_export)(struct obd_export *))
1326 cfs_list_t work_list;
1327 struct obd_export *exp, *n;
1331 CFS_INIT_LIST_HEAD(&work_list);
1332 spin_lock(&obd->obd_dev_lock);
1333 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1335 /* don't count self-export as client */
1336 if (obd_uuid_equals(&exp->exp_client_uuid,
1337 &exp->exp_obd->obd_uuid))
1340 /* don't evict clients which have no slot in last_rcvd
1341 * (e.g. lightweight connection) */
1342 if (exp->exp_target_data.ted_lr_idx == -1)
1345 spin_lock(&exp->exp_lock);
1346 if (exp->exp_failed || test_export(exp)) {
1347 spin_unlock(&exp->exp_lock);
1350 exp->exp_failed = 1;
1351 spin_unlock(&exp->exp_lock);
1353 cfs_list_move(&exp->exp_obd_chain, &work_list);
1355 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1356 obd->obd_name, exp->exp_client_uuid.uuid,
1357 exp->exp_connection == NULL ? "<unknown>" :
1358 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1359 print_export_data(exp, "EVICTING", 0);
1361 spin_unlock(&obd->obd_dev_lock);
1364 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1365 obd->obd_name, evicted);
1367 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1368 OBD_OPT_ABORT_RECOV);
1371 EXPORT_SYMBOL(class_disconnect_stale_exports);
1373 void class_fail_export(struct obd_export *exp)
1375 int rc, already_failed;
1377 spin_lock(&exp->exp_lock);
1378 already_failed = exp->exp_failed;
1379 exp->exp_failed = 1;
1380 spin_unlock(&exp->exp_lock);
1382 if (already_failed) {
1383 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1384 exp, exp->exp_client_uuid.uuid);
1388 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1389 exp, exp->exp_client_uuid.uuid);
1391 if (obd_dump_on_timeout)
1392 libcfs_debug_dumplog();
1394 /* need for safe call CDEBUG after obd_disconnect */
1395 class_export_get(exp);
1397 /* Most callers into obd_disconnect are removing their own reference
1398 * (request, for example) in addition to the one from the hash table.
1399 * We don't have such a reference here, so make one. */
1400 class_export_get(exp);
1401 rc = obd_disconnect(exp);
1403 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1405 CDEBUG(D_HA, "disconnected export %p/%s\n",
1406 exp, exp->exp_client_uuid.uuid);
1407 class_export_put(exp);
1409 EXPORT_SYMBOL(class_fail_export);
1411 char *obd_export_nid2str(struct obd_export *exp)
1413 if (exp->exp_connection != NULL)
1414 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1418 EXPORT_SYMBOL(obd_export_nid2str);
1420 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1422 cfs_hash_t *nid_hash;
1423 struct obd_export *doomed_exp = NULL;
1424 int exports_evicted = 0;
1426 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1428 spin_lock(&obd->obd_dev_lock);
1429 /* umount has run already, so evict thread should leave
1430 * its task to umount thread now */
1431 if (obd->obd_stopping) {
1432 spin_unlock(&obd->obd_dev_lock);
1433 return exports_evicted;
1435 nid_hash = obd->obd_nid_hash;
1436 cfs_hash_getref(nid_hash);
1437 spin_unlock(&obd->obd_dev_lock);
1440 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1441 if (doomed_exp == NULL)
1444 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1445 "nid %s found, wanted nid %s, requested nid %s\n",
1446 obd_export_nid2str(doomed_exp),
1447 libcfs_nid2str(nid_key), nid);
1448 LASSERTF(doomed_exp != obd->obd_self_export,
1449 "self-export is hashed by NID?\n");
1451 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1452 "request\n", obd->obd_name,
1453 obd_uuid2str(&doomed_exp->exp_client_uuid),
1454 obd_export_nid2str(doomed_exp));
1455 class_fail_export(doomed_exp);
1456 class_export_put(doomed_exp);
1459 cfs_hash_putref(nid_hash);
1461 if (!exports_evicted)
1462 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1463 obd->obd_name, nid);
1464 return exports_evicted;
1466 EXPORT_SYMBOL(obd_export_evict_by_nid);
1468 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1470 cfs_hash_t *uuid_hash;
1471 struct obd_export *doomed_exp = NULL;
1472 struct obd_uuid doomed_uuid;
1473 int exports_evicted = 0;
1475 spin_lock(&obd->obd_dev_lock);
1476 if (obd->obd_stopping) {
1477 spin_unlock(&obd->obd_dev_lock);
1478 return exports_evicted;
1480 uuid_hash = obd->obd_uuid_hash;
1481 cfs_hash_getref(uuid_hash);
1482 spin_unlock(&obd->obd_dev_lock);
1484 obd_str2uuid(&doomed_uuid, uuid);
1485 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1486 CERROR("%s: can't evict myself\n", obd->obd_name);
1487 cfs_hash_putref(uuid_hash);
1488 return exports_evicted;
1491 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1493 if (doomed_exp == NULL) {
1494 CERROR("%s: can't disconnect %s: no exports found\n",
1495 obd->obd_name, uuid);
1497 CWARN("%s: evicting %s at adminstrative request\n",
1498 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1499 class_fail_export(doomed_exp);
1500 class_export_put(doomed_exp);
1503 cfs_hash_putref(uuid_hash);
1505 return exports_evicted;
1507 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1509 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1510 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1511 EXPORT_SYMBOL(class_export_dump_hook);
1514 static void print_export_data(struct obd_export *exp, const char *status,
1517 struct ptlrpc_reply_state *rs;
1518 struct ptlrpc_reply_state *first_reply = NULL;
1521 spin_lock(&exp->exp_lock);
1522 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1528 spin_unlock(&exp->exp_lock);
1530 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1531 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1532 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1533 cfs_atomic_read(&exp->exp_rpc_count),
1534 cfs_atomic_read(&exp->exp_cb_count),
1535 cfs_atomic_read(&exp->exp_locks_count),
1536 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1537 nreplies, first_reply, nreplies > 3 ? "..." : "",
1538 exp->exp_last_committed);
1539 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1540 if (locks && class_export_dump_hook != NULL)
1541 class_export_dump_hook(exp);
1545 void dump_exports(struct obd_device *obd, int locks)
1547 struct obd_export *exp;
1549 spin_lock(&obd->obd_dev_lock);
1550 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1551 print_export_data(exp, "ACTIVE", locks);
1552 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1553 print_export_data(exp, "UNLINKED", locks);
1554 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1555 print_export_data(exp, "DELAYED", locks);
1556 spin_unlock(&obd->obd_dev_lock);
1557 spin_lock(&obd_zombie_impexp_lock);
1558 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1559 print_export_data(exp, "ZOMBIE", locks);
1560 spin_unlock(&obd_zombie_impexp_lock);
1562 EXPORT_SYMBOL(dump_exports);
1564 void obd_exports_barrier(struct obd_device *obd)
1567 LASSERT(cfs_list_empty(&obd->obd_exports));
1568 spin_lock(&obd->obd_dev_lock);
1569 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1570 spin_unlock(&obd->obd_dev_lock);
1571 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1572 cfs_time_seconds(waited));
1573 if (waited > 5 && IS_PO2(waited)) {
1574 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1575 "more than %d seconds. "
1576 "The obd refcount = %d. Is it stuck?\n",
1577 obd->obd_name, waited,
1578 cfs_atomic_read(&obd->obd_refcount));
1579 dump_exports(obd, 1);
1582 spin_lock(&obd->obd_dev_lock);
1584 spin_unlock(&obd->obd_dev_lock);
1586 EXPORT_SYMBOL(obd_exports_barrier);
1588 /* Total amount of zombies to be destroyed */
1589 static int zombies_count = 0;
1592 * kill zombie imports and exports
1594 void obd_zombie_impexp_cull(void)
1596 struct obd_import *import;
1597 struct obd_export *export;
1601 spin_lock(&obd_zombie_impexp_lock);
1604 if (!cfs_list_empty(&obd_zombie_imports)) {
1605 import = cfs_list_entry(obd_zombie_imports.next,
1608 cfs_list_del_init(&import->imp_zombie_chain);
1612 if (!cfs_list_empty(&obd_zombie_exports)) {
1613 export = cfs_list_entry(obd_zombie_exports.next,
1616 cfs_list_del_init(&export->exp_obd_chain);
1619 spin_unlock(&obd_zombie_impexp_lock);
1621 if (import != NULL) {
1622 class_import_destroy(import);
1623 spin_lock(&obd_zombie_impexp_lock);
1625 spin_unlock(&obd_zombie_impexp_lock);
1628 if (export != NULL) {
1629 class_export_destroy(export);
1630 spin_lock(&obd_zombie_impexp_lock);
1632 spin_unlock(&obd_zombie_impexp_lock);
1636 } while (import != NULL || export != NULL);
1640 static struct completion obd_zombie_start;
1641 static struct completion obd_zombie_stop;
1642 static unsigned long obd_zombie_flags;
1643 static cfs_waitq_t obd_zombie_waitq;
1644 static pid_t obd_zombie_pid;
1647 OBD_ZOMBIE_STOP = 0x0001,
1651 * check for work for kill zombie import/export thread.
1653 static int obd_zombie_impexp_check(void *arg)
1657 spin_lock(&obd_zombie_impexp_lock);
1658 rc = (zombies_count == 0) &&
1659 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1660 spin_unlock(&obd_zombie_impexp_lock);
1666 * Add export to the obd_zombe thread and notify it.
1668 static void obd_zombie_export_add(struct obd_export *exp) {
1669 spin_lock(&exp->exp_obd->obd_dev_lock);
1670 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1671 cfs_list_del_init(&exp->exp_obd_chain);
1672 spin_unlock(&exp->exp_obd->obd_dev_lock);
1673 spin_lock(&obd_zombie_impexp_lock);
1675 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1676 spin_unlock(&obd_zombie_impexp_lock);
1678 obd_zombie_impexp_notify();
1682 * Add import to the obd_zombe thread and notify it.
1684 static void obd_zombie_import_add(struct obd_import *imp) {
1685 LASSERT(imp->imp_sec == NULL);
1686 LASSERT(imp->imp_rq_pool == NULL);
1687 spin_lock(&obd_zombie_impexp_lock);
1688 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1690 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1691 spin_unlock(&obd_zombie_impexp_lock);
1693 obd_zombie_impexp_notify();
1697 * notify import/export destroy thread about new zombie.
1699 static void obd_zombie_impexp_notify(void)
1702 * Make sure obd_zomebie_impexp_thread get this notification.
1703 * It is possible this signal only get by obd_zombie_barrier, and
1704 * barrier gulps this notification and sleeps away and hangs ensues
1706 cfs_waitq_broadcast(&obd_zombie_waitq);
1710 * check whether obd_zombie is idle
1712 static int obd_zombie_is_idle(void)
1716 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1717 spin_lock(&obd_zombie_impexp_lock);
1718 rc = (zombies_count == 0);
1719 spin_unlock(&obd_zombie_impexp_lock);
1724 * wait when obd_zombie import/export queues become empty
1726 void obd_zombie_barrier(void)
1728 struct l_wait_info lwi = { 0 };
1730 if (obd_zombie_pid == cfs_curproc_pid())
1731 /* don't wait for myself */
1733 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1735 EXPORT_SYMBOL(obd_zombie_barrier);
1740 * destroy zombie export/import thread.
1742 static int obd_zombie_impexp_thread(void *unused)
1746 rc = cfs_daemonize_ctxt("obd_zombid");
1748 complete(&obd_zombie_start);
1752 complete(&obd_zombie_start);
1754 obd_zombie_pid = cfs_curproc_pid();
1756 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1757 struct l_wait_info lwi = { 0 };
1759 l_wait_event(obd_zombie_waitq,
1760 !obd_zombie_impexp_check(NULL), &lwi);
1761 obd_zombie_impexp_cull();
1764 * Notify obd_zombie_barrier callers that queues
1767 cfs_waitq_signal(&obd_zombie_waitq);
1770 complete(&obd_zombie_stop);
1775 #else /* ! KERNEL */
1777 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1778 static void *obd_zombie_impexp_work_cb;
1779 static void *obd_zombie_impexp_idle_cb;
1781 int obd_zombie_impexp_kill(void *arg)
1785 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1786 obd_zombie_impexp_cull();
1789 cfs_atomic_dec(&zombie_recur);
1796 * start destroy zombie import/export thread
1798 int obd_zombie_impexp_init(void)
1802 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1803 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1804 spin_lock_init(&obd_zombie_impexp_lock);
1805 init_completion(&obd_zombie_start);
1806 init_completion(&obd_zombie_stop);
1807 cfs_waitq_init(&obd_zombie_waitq);
1811 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1815 wait_for_completion(&obd_zombie_start);
1818 obd_zombie_impexp_work_cb =
1819 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1820 &obd_zombie_impexp_kill, NULL);
1822 obd_zombie_impexp_idle_cb =
1823 liblustre_register_idle_callback("obd_zombi_impexp_check",
1824 &obd_zombie_impexp_check, NULL);
1830 * stop destroy zombie import/export thread
1832 void obd_zombie_impexp_stop(void)
1834 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1835 obd_zombie_impexp_notify();
1837 wait_for_completion(&obd_zombie_stop);
1839 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1840 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1844 /***** Kernel-userspace comm helpers *******/
1846 /* Get length of entire message, including header */
1847 int kuc_len(int payload_len)
1849 return sizeof(struct kuc_hdr) + payload_len;
1851 EXPORT_SYMBOL(kuc_len);
1853 /* Get a pointer to kuc header, given a ptr to the payload
1854 * @param p Pointer to payload area
1855 * @returns Pointer to kuc header
1857 struct kuc_hdr * kuc_ptr(void *p)
1859 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1860 LASSERT(lh->kuc_magic == KUC_MAGIC);
1863 EXPORT_SYMBOL(kuc_ptr);
1865 /* Test if payload is part of kuc message
1866 * @param p Pointer to payload area
1869 int kuc_ispayload(void *p)
1871 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1873 if (kh->kuc_magic == KUC_MAGIC)
1878 EXPORT_SYMBOL(kuc_ispayload);
1880 /* Alloc space for a message, and fill in header
1881 * @return Pointer to payload area
1883 void *kuc_alloc(int payload_len, int transport, int type)
1886 int len = kuc_len(payload_len);
1890 return ERR_PTR(-ENOMEM);
1892 lh->kuc_magic = KUC_MAGIC;
1893 lh->kuc_transport = transport;
1894 lh->kuc_msgtype = type;
1895 lh->kuc_msglen = len;
1897 return (void *)(lh + 1);
1899 EXPORT_SYMBOL(kuc_alloc);
1901 /* Takes pointer to payload area */
1902 inline void kuc_free(void *p, int payload_len)
1904 struct kuc_hdr *lh = kuc_ptr(p);
1905 OBD_FREE(lh, kuc_len(payload_len));
1907 EXPORT_SYMBOL(kuc_free);