4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
53 struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 struct kmem_cache *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, __GFP_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
151 EXPORT_SYMBOL(class_get_type);
153 void class_put_type(struct obd_type *type)
156 spin_lock(&type->obd_type_lock);
158 module_put(type->typ_dt_ops->o_owner);
159 spin_unlock(&type->obd_type_lock);
161 EXPORT_SYMBOL(class_put_type);
163 #define CLASS_MAX_NAME 1024
165 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
166 struct lprocfs_vars *vars, const char *name,
167 struct lu_device_type *ldt)
169 struct obd_type *type;
174 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
176 if (class_search_type(name)) {
177 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
182 OBD_ALLOC(type, sizeof(*type));
186 OBD_ALLOC_PTR(type->typ_dt_ops);
187 OBD_ALLOC_PTR(type->typ_md_ops);
188 OBD_ALLOC(type->typ_name, strlen(name) + 1);
190 if (type->typ_dt_ops == NULL ||
191 type->typ_md_ops == NULL ||
192 type->typ_name == NULL)
195 *(type->typ_dt_ops) = *dt_ops;
196 /* md_ops is optional */
198 *(type->typ_md_ops) = *md_ops;
199 strcpy(type->typ_name, name);
200 spin_lock_init(&type->obd_type_lock);
203 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
205 if (IS_ERR(type->typ_procroot)) {
206 rc = PTR_ERR(type->typ_procroot);
207 type->typ_procroot = NULL;
213 rc = lu_device_type_init(ldt);
218 spin_lock(&obd_types_lock);
219 cfs_list_add(&type->typ_chain, &obd_types);
220 spin_unlock(&obd_types_lock);
225 if (type->typ_name != NULL)
226 OBD_FREE(type->typ_name, strlen(name) + 1);
227 if (type->typ_md_ops != NULL)
228 OBD_FREE_PTR(type->typ_md_ops);
229 if (type->typ_dt_ops != NULL)
230 OBD_FREE_PTR(type->typ_dt_ops);
231 OBD_FREE(type, sizeof(*type));
234 EXPORT_SYMBOL(class_register_type);
236 int class_unregister_type(const char *name)
238 struct obd_type *type = class_search_type(name);
242 CERROR("unknown obd type\n");
246 if (type->typ_refcnt) {
247 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
248 /* This is a bad situation, let's make the best of it */
249 /* Remove ops, but leave the name for debugging */
250 OBD_FREE_PTR(type->typ_dt_ops);
251 OBD_FREE_PTR(type->typ_md_ops);
255 /* we do not use type->typ_procroot as for compatibility purposes
256 * other modules can share names (i.e. lod can use lov entry). so
257 * we can't reference pointer as it can get invalided when another
258 * module removes the entry */
259 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
262 lu_device_type_fini(type->typ_lu);
264 spin_lock(&obd_types_lock);
265 cfs_list_del(&type->typ_chain);
266 spin_unlock(&obd_types_lock);
267 OBD_FREE(type->typ_name, strlen(name) + 1);
268 if (type->typ_dt_ops != NULL)
269 OBD_FREE_PTR(type->typ_dt_ops);
270 if (type->typ_md_ops != NULL)
271 OBD_FREE_PTR(type->typ_md_ops);
272 OBD_FREE(type, sizeof(*type));
274 } /* class_unregister_type */
275 EXPORT_SYMBOL(class_unregister_type);
278 * Create a new obd device.
280 * Find an empty slot in ::obd_devs[], create a new obd device in it.
282 * \param[in] type_name obd device type string.
283 * \param[in] name obd device name.
285 * \retval NULL if create fails, otherwise return the obd device
288 struct obd_device *class_newdev(const char *type_name, const char *name)
290 struct obd_device *result = NULL;
291 struct obd_device *newdev;
292 struct obd_type *type = NULL;
294 int new_obd_minor = 0;
297 if (strlen(name) >= MAX_OBD_NAME) {
298 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
299 RETURN(ERR_PTR(-EINVAL));
302 type = class_get_type(type_name);
304 CERROR("OBD: unknown type: %s\n", type_name);
305 RETURN(ERR_PTR(-ENODEV));
308 newdev = obd_device_alloc();
310 GOTO(out_type, result = ERR_PTR(-ENOMEM));
312 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
314 write_lock(&obd_dev_lock);
315 for (i = 0; i < class_devno_max(); i++) {
316 struct obd_device *obd = class_num2obd(i);
318 if (obd && (strcmp(name, obd->obd_name) == 0)) {
319 CERROR("Device %s already exists at %d, won't add\n",
322 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
323 "%p obd_magic %08x != %08x\n", result,
324 result->obd_magic, OBD_DEVICE_MAGIC);
325 LASSERTF(result->obd_minor == new_obd_minor,
326 "%p obd_minor %d != %d\n", result,
327 result->obd_minor, new_obd_minor);
329 obd_devs[result->obd_minor] = NULL;
330 result->obd_name[0]='\0';
332 result = ERR_PTR(-EEXIST);
335 if (!result && !obd) {
337 result->obd_minor = i;
339 result->obd_type = type;
340 strncpy(result->obd_name, name,
341 sizeof(result->obd_name) - 1);
342 obd_devs[i] = result;
345 write_unlock(&obd_dev_lock);
347 if (result == NULL && i >= class_devno_max()) {
348 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
350 GOTO(out, result = ERR_PTR(-EOVERFLOW));
356 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
357 result->obd_name, result);
361 obd_device_free(newdev);
363 class_put_type(type);
367 void class_release_dev(struct obd_device *obd)
369 struct obd_type *obd_type = obd->obd_type;
371 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
372 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
373 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
374 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
375 LASSERT(obd_type != NULL);
377 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
378 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
380 write_lock(&obd_dev_lock);
381 obd_devs[obd->obd_minor] = NULL;
382 write_unlock(&obd_dev_lock);
383 obd_device_free(obd);
385 class_put_type(obd_type);
388 int class_name2dev(const char *name)
395 read_lock(&obd_dev_lock);
396 for (i = 0; i < class_devno_max(); i++) {
397 struct obd_device *obd = class_num2obd(i);
399 if (obd && strcmp(name, obd->obd_name) == 0) {
400 /* Make sure we finished attaching before we give
401 out any references */
402 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
403 if (obd->obd_attached) {
404 read_unlock(&obd_dev_lock);
410 read_unlock(&obd_dev_lock);
414 EXPORT_SYMBOL(class_name2dev);
416 struct obd_device *class_name2obd(const char *name)
418 int dev = class_name2dev(name);
420 if (dev < 0 || dev > class_devno_max())
422 return class_num2obd(dev);
424 EXPORT_SYMBOL(class_name2obd);
426 int class_uuid2dev(struct obd_uuid *uuid)
430 read_lock(&obd_dev_lock);
431 for (i = 0; i < class_devno_max(); i++) {
432 struct obd_device *obd = class_num2obd(i);
434 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
435 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
436 read_unlock(&obd_dev_lock);
440 read_unlock(&obd_dev_lock);
444 EXPORT_SYMBOL(class_uuid2dev);
446 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
448 int dev = class_uuid2dev(uuid);
451 return class_num2obd(dev);
453 EXPORT_SYMBOL(class_uuid2obd);
456 * Get obd device from ::obd_devs[]
458 * \param num [in] array index
460 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
461 * otherwise return the obd device there.
463 struct obd_device *class_num2obd(int num)
465 struct obd_device *obd = NULL;
467 if (num < class_devno_max()) {
472 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
473 "%p obd_magic %08x != %08x\n",
474 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
475 LASSERTF(obd->obd_minor == num,
476 "%p obd_minor %0d != %0d\n",
477 obd, obd->obd_minor, num);
482 EXPORT_SYMBOL(class_num2obd);
485 * Get obd devices count. Device in any
487 * \retval obd device count
489 int get_devices_count(void)
491 int index, max_index = class_devno_max(), dev_count = 0;
493 read_lock(&obd_dev_lock);
494 for (index = 0; index <= max_index; index++) {
495 struct obd_device *obd = class_num2obd(index);
499 read_unlock(&obd_dev_lock);
503 EXPORT_SYMBOL(get_devices_count);
505 void class_obd_list(void)
510 read_lock(&obd_dev_lock);
511 for (i = 0; i < class_devno_max(); i++) {
512 struct obd_device *obd = class_num2obd(i);
516 if (obd->obd_stopping)
518 else if (obd->obd_set_up)
520 else if (obd->obd_attached)
524 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
525 i, status, obd->obd_type->typ_name,
526 obd->obd_name, obd->obd_uuid.uuid,
527 cfs_atomic_read(&obd->obd_refcount));
529 read_unlock(&obd_dev_lock);
533 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
534 specified, then only the client with that uuid is returned,
535 otherwise any client connected to the tgt is returned. */
536 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
537 const char * typ_name,
538 struct obd_uuid *grp_uuid)
542 read_lock(&obd_dev_lock);
543 for (i = 0; i < class_devno_max(); i++) {
544 struct obd_device *obd = class_num2obd(i);
548 if ((strncmp(obd->obd_type->typ_name, typ_name,
549 strlen(typ_name)) == 0)) {
550 if (obd_uuid_equals(tgt_uuid,
551 &obd->u.cli.cl_target_uuid) &&
552 ((grp_uuid)? obd_uuid_equals(grp_uuid,
553 &obd->obd_uuid) : 1)) {
554 read_unlock(&obd_dev_lock);
559 read_unlock(&obd_dev_lock);
563 EXPORT_SYMBOL(class_find_client_obd);
565 /* Iterate the obd_device list looking devices have grp_uuid. Start
566 searching at *next, and if a device is found, the next index to look
567 at is saved in *next. If next is NULL, then the first matching device
568 will always be returned. */
569 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
575 else if (*next >= 0 && *next < class_devno_max())
580 read_lock(&obd_dev_lock);
581 for (; i < class_devno_max(); i++) {
582 struct obd_device *obd = class_num2obd(i);
586 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
589 read_unlock(&obd_dev_lock);
593 read_unlock(&obd_dev_lock);
597 EXPORT_SYMBOL(class_devices_in_group);
600 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
601 * adjust sptlrpc settings accordingly.
603 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
605 struct obd_device *obd;
609 LASSERT(namelen > 0);
611 read_lock(&obd_dev_lock);
612 for (i = 0; i < class_devno_max(); i++) {
613 obd = class_num2obd(i);
615 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
618 /* only notify mdc, osc, mdt, ost */
619 type = obd->obd_type->typ_name;
620 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
621 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
622 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
623 strcmp(type, LUSTRE_OST_NAME) != 0)
626 if (strncmp(obd->obd_name, fsname, namelen))
629 class_incref(obd, __FUNCTION__, obd);
630 read_unlock(&obd_dev_lock);
631 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
632 sizeof(KEY_SPTLRPC_CONF),
633 KEY_SPTLRPC_CONF, 0, NULL, NULL);
635 class_decref(obd, __FUNCTION__, obd);
636 read_lock(&obd_dev_lock);
638 read_unlock(&obd_dev_lock);
641 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
643 void obd_cleanup_caches(void)
646 if (obd_device_cachep) {
647 kmem_cache_destroy(obd_device_cachep);
648 obd_device_cachep = NULL;
651 kmem_cache_destroy(obdo_cachep);
655 kmem_cache_destroy(import_cachep);
656 import_cachep = NULL;
659 kmem_cache_destroy(capa_cachep);
665 int obd_init_caches(void)
669 LASSERT(obd_device_cachep == NULL);
670 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
671 sizeof(struct obd_device),
673 if (!obd_device_cachep)
676 LASSERT(obdo_cachep == NULL);
677 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
682 LASSERT(import_cachep == NULL);
683 import_cachep = kmem_cache_create("ll_import_cache",
684 sizeof(struct obd_import),
689 LASSERT(capa_cachep == NULL);
690 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
697 obd_cleanup_caches();
702 /* map connection to client */
703 struct obd_export *class_conn2export(struct lustre_handle *conn)
705 struct obd_export *export;
709 CDEBUG(D_CACHE, "looking for null handle\n");
713 if (conn->cookie == -1) { /* this means assign a new connection */
714 CDEBUG(D_CACHE, "want a new connection\n");
718 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
719 export = class_handle2object(conn->cookie, NULL);
722 EXPORT_SYMBOL(class_conn2export);
724 struct obd_device *class_exp2obd(struct obd_export *exp)
730 EXPORT_SYMBOL(class_exp2obd);
732 struct obd_device *class_conn2obd(struct lustre_handle *conn)
734 struct obd_export *export;
735 export = class_conn2export(conn);
737 struct obd_device *obd = export->exp_obd;
738 class_export_put(export);
743 EXPORT_SYMBOL(class_conn2obd);
745 struct obd_import *class_exp2cliimp(struct obd_export *exp)
747 struct obd_device *obd = exp->exp_obd;
750 return obd->u.cli.cl_import;
752 EXPORT_SYMBOL(class_exp2cliimp);
754 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
756 struct obd_device *obd = class_conn2obd(conn);
759 return obd->u.cli.cl_import;
761 EXPORT_SYMBOL(class_conn2cliimp);
763 /* Export management functions */
764 static void class_export_destroy(struct obd_export *exp)
766 struct obd_device *obd = exp->exp_obd;
769 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
770 LASSERT(obd != NULL);
772 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
773 exp->exp_client_uuid.uuid, obd->obd_name);
775 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
776 if (exp->exp_connection)
777 ptlrpc_put_connection_superhack(exp->exp_connection);
779 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
780 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
781 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
782 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
783 obd_destroy_export(exp);
784 class_decref(obd, "export", exp);
786 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
790 static void export_handle_addref(void *export)
792 class_export_get(export);
795 static struct portals_handle_ops export_handle_ops = {
796 .hop_addref = export_handle_addref,
800 struct obd_export *class_export_get(struct obd_export *exp)
802 cfs_atomic_inc(&exp->exp_refcount);
803 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
804 cfs_atomic_read(&exp->exp_refcount));
807 EXPORT_SYMBOL(class_export_get);
809 void class_export_put(struct obd_export *exp)
811 LASSERT(exp != NULL);
812 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
813 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
814 cfs_atomic_read(&exp->exp_refcount) - 1);
816 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
817 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
818 CDEBUG(D_IOCTL, "final put %p/%s\n",
819 exp, exp->exp_client_uuid.uuid);
821 /* release nid stat refererence */
822 lprocfs_exp_cleanup(exp);
824 obd_zombie_export_add(exp);
827 EXPORT_SYMBOL(class_export_put);
829 /* Creates a new export, adds it to the hash table, and returns a
830 * pointer to it. The refcount is 2: one for the hash reference, and
831 * one for the pointer returned by this function. */
832 struct obd_export *class_new_export(struct obd_device *obd,
833 struct obd_uuid *cluuid)
835 struct obd_export *export;
836 cfs_hash_t *hash = NULL;
840 OBD_ALLOC_PTR(export);
842 return ERR_PTR(-ENOMEM);
844 export->exp_conn_cnt = 0;
845 export->exp_lock_hash = NULL;
846 export->exp_flock_hash = NULL;
847 cfs_atomic_set(&export->exp_refcount, 2);
848 cfs_atomic_set(&export->exp_rpc_count, 0);
849 cfs_atomic_set(&export->exp_cb_count, 0);
850 cfs_atomic_set(&export->exp_locks_count, 0);
851 #if LUSTRE_TRACKS_LOCK_EXP_REFS
852 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
853 spin_lock_init(&export->exp_locks_list_guard);
855 cfs_atomic_set(&export->exp_replay_count, 0);
856 export->exp_obd = obd;
857 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
858 spin_lock_init(&export->exp_uncommitted_replies_lock);
859 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
860 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
861 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
862 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
863 CFS_INIT_LIST_HEAD(&export->exp_reg_rpcs);
864 class_handle_hash(&export->exp_handle, &export_handle_ops);
865 export->exp_last_request_time = cfs_time_current_sec();
866 spin_lock_init(&export->exp_lock);
867 spin_lock_init(&export->exp_rpc_lock);
868 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
869 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
870 spin_lock_init(&export->exp_bl_list_lock);
871 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
873 export->exp_sp_peer = LUSTRE_SP_ANY;
874 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
875 export->exp_client_uuid = *cluuid;
876 obd_init_export(export);
878 spin_lock(&obd->obd_dev_lock);
879 /* shouldn't happen, but might race */
880 if (obd->obd_stopping)
881 GOTO(exit_unlock, rc = -ENODEV);
883 hash = cfs_hash_getref(obd->obd_uuid_hash);
885 GOTO(exit_unlock, rc = -ENODEV);
886 spin_unlock(&obd->obd_dev_lock);
888 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
889 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
891 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
892 obd->obd_name, cluuid->uuid, rc);
893 GOTO(exit_err, rc = -EALREADY);
897 spin_lock(&obd->obd_dev_lock);
898 if (obd->obd_stopping) {
899 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
900 GOTO(exit_unlock, rc = -ENODEV);
903 class_incref(obd, "export", export);
904 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
905 cfs_list_add_tail(&export->exp_obd_chain_timed,
906 &export->exp_obd->obd_exports_timed);
907 export->exp_obd->obd_num_exports++;
908 spin_unlock(&obd->obd_dev_lock);
909 cfs_hash_putref(hash);
913 spin_unlock(&obd->obd_dev_lock);
916 cfs_hash_putref(hash);
917 class_handle_unhash(&export->exp_handle);
918 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
919 obd_destroy_export(export);
920 OBD_FREE_PTR(export);
923 EXPORT_SYMBOL(class_new_export);
925 void class_unlink_export(struct obd_export *exp)
927 class_handle_unhash(&exp->exp_handle);
929 spin_lock(&exp->exp_obd->obd_dev_lock);
930 /* delete an uuid-export hashitem from hashtables */
931 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
932 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
933 &exp->exp_client_uuid,
934 &exp->exp_uuid_hash);
936 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
937 cfs_list_del_init(&exp->exp_obd_chain_timed);
938 exp->exp_obd->obd_num_exports--;
939 spin_unlock(&exp->exp_obd->obd_dev_lock);
940 class_export_put(exp);
942 EXPORT_SYMBOL(class_unlink_export);
944 /* Import management functions */
945 void class_import_destroy(struct obd_import *imp)
949 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
950 imp->imp_obd->obd_name);
952 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
954 ptlrpc_put_connection_superhack(imp->imp_connection);
956 while (!cfs_list_empty(&imp->imp_conn_list)) {
957 struct obd_import_conn *imp_conn;
959 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
960 struct obd_import_conn, oic_item);
961 cfs_list_del_init(&imp_conn->oic_item);
962 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
963 OBD_FREE(imp_conn, sizeof(*imp_conn));
966 LASSERT(imp->imp_sec == NULL);
967 class_decref(imp->imp_obd, "import", imp);
968 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
972 static void import_handle_addref(void *import)
974 class_import_get(import);
977 static struct portals_handle_ops import_handle_ops = {
978 .hop_addref = import_handle_addref,
982 struct obd_import *class_import_get(struct obd_import *import)
984 cfs_atomic_inc(&import->imp_refcount);
985 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
986 cfs_atomic_read(&import->imp_refcount),
987 import->imp_obd->obd_name);
990 EXPORT_SYMBOL(class_import_get);
992 void class_import_put(struct obd_import *imp)
996 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
997 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
999 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1000 cfs_atomic_read(&imp->imp_refcount) - 1,
1001 imp->imp_obd->obd_name);
1003 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1004 CDEBUG(D_INFO, "final put import %p\n", imp);
1005 obd_zombie_import_add(imp);
1008 /* catch possible import put race */
1009 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1012 EXPORT_SYMBOL(class_import_put);
1014 static void init_imp_at(struct imp_at *at) {
1016 at_init(&at->iat_net_latency, 0, 0);
1017 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1018 /* max service estimates are tracked on the server side, so
1019 don't use the AT history here, just use the last reported
1020 val. (But keep hist for proc histogram, worst_ever) */
1021 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1026 struct obd_import *class_new_import(struct obd_device *obd)
1028 struct obd_import *imp;
1030 OBD_ALLOC(imp, sizeof(*imp));
1034 CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1035 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1036 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1037 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1038 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1039 CFS_INIT_LIST_HEAD(&imp->imp_committed_list);
1040 imp->imp_replay_cursor = &imp->imp_committed_list;
1041 spin_lock_init(&imp->imp_lock);
1042 imp->imp_last_success_conn = 0;
1043 imp->imp_state = LUSTRE_IMP_NEW;
1044 imp->imp_obd = class_incref(obd, "import", imp);
1045 mutex_init(&imp->imp_sec_mutex);
1046 init_waitqueue_head(&imp->imp_recovery_waitq);
1048 cfs_atomic_set(&imp->imp_refcount, 2);
1049 cfs_atomic_set(&imp->imp_unregistering, 0);
1050 cfs_atomic_set(&imp->imp_inflight, 0);
1051 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1052 cfs_atomic_set(&imp->imp_inval_count, 0);
1053 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1054 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1055 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1056 init_imp_at(&imp->imp_at);
1058 /* the default magic is V2, will be used in connect RPC, and
1059 * then adjusted according to the flags in request/reply. */
1060 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1064 EXPORT_SYMBOL(class_new_import);
1066 void class_destroy_import(struct obd_import *import)
1068 LASSERT(import != NULL);
1069 LASSERT(import != LP_POISON);
1071 class_handle_unhash(&import->imp_handle);
1073 spin_lock(&import->imp_lock);
1074 import->imp_generation++;
1075 spin_unlock(&import->imp_lock);
1076 class_import_put(import);
1078 EXPORT_SYMBOL(class_destroy_import);
1080 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1082 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1084 spin_lock(&exp->exp_locks_list_guard);
1086 LASSERT(lock->l_exp_refs_nr >= 0);
1088 if (lock->l_exp_refs_target != NULL &&
1089 lock->l_exp_refs_target != exp) {
1090 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1091 exp, lock, lock->l_exp_refs_target);
1093 if ((lock->l_exp_refs_nr ++) == 0) {
1094 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1095 lock->l_exp_refs_target = exp;
1097 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1098 lock, exp, lock->l_exp_refs_nr);
1099 spin_unlock(&exp->exp_locks_list_guard);
1101 EXPORT_SYMBOL(__class_export_add_lock_ref);
1103 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1105 spin_lock(&exp->exp_locks_list_guard);
1106 LASSERT(lock->l_exp_refs_nr > 0);
1107 if (lock->l_exp_refs_target != exp) {
1108 LCONSOLE_WARN("lock %p, "
1109 "mismatching export pointers: %p, %p\n",
1110 lock, lock->l_exp_refs_target, exp);
1112 if (-- lock->l_exp_refs_nr == 0) {
1113 cfs_list_del_init(&lock->l_exp_refs_link);
1114 lock->l_exp_refs_target = NULL;
1116 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1117 lock, exp, lock->l_exp_refs_nr);
1118 spin_unlock(&exp->exp_locks_list_guard);
1120 EXPORT_SYMBOL(__class_export_del_lock_ref);
1123 /* A connection defines an export context in which preallocation can
1124 be managed. This releases the export pointer reference, and returns
1125 the export handle, so the export refcount is 1 when this function
1127 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1128 struct obd_uuid *cluuid)
1130 struct obd_export *export;
1131 LASSERT(conn != NULL);
1132 LASSERT(obd != NULL);
1133 LASSERT(cluuid != NULL);
1136 export = class_new_export(obd, cluuid);
1138 RETURN(PTR_ERR(export));
1140 conn->cookie = export->exp_handle.h_cookie;
1141 class_export_put(export);
1143 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1144 cluuid->uuid, conn->cookie);
1147 EXPORT_SYMBOL(class_connect);
1149 /* if export is involved in recovery then clean up related things */
1150 void class_export_recovery_cleanup(struct obd_export *exp)
1152 struct obd_device *obd = exp->exp_obd;
1154 spin_lock(&obd->obd_recovery_task_lock);
1155 if (exp->exp_delayed)
1156 obd->obd_delayed_clients--;
1157 if (obd->obd_recovering) {
1158 if (exp->exp_in_recovery) {
1159 spin_lock(&exp->exp_lock);
1160 exp->exp_in_recovery = 0;
1161 spin_unlock(&exp->exp_lock);
1162 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1163 cfs_atomic_dec(&obd->obd_connected_clients);
1166 /* if called during recovery then should update
1167 * obd_stale_clients counter,
1168 * lightweight exports are not counted */
1169 if (exp->exp_failed &&
1170 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1171 exp->exp_obd->obd_stale_clients++;
1173 spin_unlock(&obd->obd_recovery_task_lock);
1174 /** Cleanup req replay fields */
1175 if (exp->exp_req_replay_needed) {
1176 spin_lock(&exp->exp_lock);
1177 exp->exp_req_replay_needed = 0;
1178 spin_unlock(&exp->exp_lock);
1179 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1180 cfs_atomic_dec(&obd->obd_req_replay_clients);
1182 /** Cleanup lock replay data */
1183 if (exp->exp_lock_replay_needed) {
1184 spin_lock(&exp->exp_lock);
1185 exp->exp_lock_replay_needed = 0;
1186 spin_unlock(&exp->exp_lock);
1187 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1188 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1192 /* This function removes 1-3 references from the export:
1193 * 1 - for export pointer passed
1194 * and if disconnect really need
1195 * 2 - removing from hash
1196 * 3 - in client_unlink_export
1197 * The export pointer passed to this function can destroyed */
1198 int class_disconnect(struct obd_export *export)
1200 int already_disconnected;
1203 if (export == NULL) {
1204 CWARN("attempting to free NULL export %p\n", export);
1208 spin_lock(&export->exp_lock);
1209 already_disconnected = export->exp_disconnected;
1210 export->exp_disconnected = 1;
1211 spin_unlock(&export->exp_lock);
1213 /* class_cleanup(), abort_recovery(), and class_fail_export()
1214 * all end up in here, and if any of them race we shouldn't
1215 * call extra class_export_puts(). */
1216 if (already_disconnected) {
1217 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1218 GOTO(no_disconn, already_disconnected);
1221 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1222 export->exp_handle.h_cookie);
1224 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1225 cfs_hash_del(export->exp_obd->obd_nid_hash,
1226 &export->exp_connection->c_peer.nid,
1227 &export->exp_nid_hash);
1229 class_export_recovery_cleanup(export);
1230 class_unlink_export(export);
1232 class_export_put(export);
1235 EXPORT_SYMBOL(class_disconnect);
1237 /* Return non-zero for a fully connected export */
1238 int class_connected_export(struct obd_export *exp)
1243 spin_lock(&exp->exp_lock);
1244 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1245 spin_unlock(&exp->exp_lock);
1249 EXPORT_SYMBOL(class_connected_export);
1251 static void class_disconnect_export_list(cfs_list_t *list,
1252 enum obd_option flags)
1255 struct obd_export *exp;
1258 /* It's possible that an export may disconnect itself, but
1259 * nothing else will be added to this list. */
1260 while (!cfs_list_empty(list)) {
1261 exp = cfs_list_entry(list->next, struct obd_export,
1263 /* need for safe call CDEBUG after obd_disconnect */
1264 class_export_get(exp);
1266 spin_lock(&exp->exp_lock);
1267 exp->exp_flags = flags;
1268 spin_unlock(&exp->exp_lock);
1270 if (obd_uuid_equals(&exp->exp_client_uuid,
1271 &exp->exp_obd->obd_uuid)) {
1273 "exp %p export uuid == obd uuid, don't discon\n",
1275 /* Need to delete this now so we don't end up pointing
1276 * to work_list later when this export is cleaned up. */
1277 cfs_list_del_init(&exp->exp_obd_chain);
1278 class_export_put(exp);
1282 class_export_get(exp);
1283 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1284 "last request at "CFS_TIME_T"\n",
1285 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1286 exp, exp->exp_last_request_time);
1287 /* release one export reference anyway */
1288 rc = obd_disconnect(exp);
1290 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1291 obd_export_nid2str(exp), exp, rc);
1292 class_export_put(exp);
1297 void class_disconnect_exports(struct obd_device *obd)
1299 cfs_list_t work_list;
1302 /* Move all of the exports from obd_exports to a work list, en masse. */
1303 CFS_INIT_LIST_HEAD(&work_list);
1304 spin_lock(&obd->obd_dev_lock);
1305 cfs_list_splice_init(&obd->obd_exports, &work_list);
1306 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1307 spin_unlock(&obd->obd_dev_lock);
1309 if (!cfs_list_empty(&work_list)) {
1310 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1311 "disconnecting them\n", obd->obd_minor, obd);
1312 class_disconnect_export_list(&work_list,
1313 exp_flags_from_obd(obd));
1315 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1316 obd->obd_minor, obd);
1319 EXPORT_SYMBOL(class_disconnect_exports);
1321 /* Remove exports that have not completed recovery.
1323 void class_disconnect_stale_exports(struct obd_device *obd,
1324 int (*test_export)(struct obd_export *))
1326 cfs_list_t work_list;
1327 struct obd_export *exp, *n;
1331 CFS_INIT_LIST_HEAD(&work_list);
1332 spin_lock(&obd->obd_dev_lock);
1333 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1335 /* don't count self-export as client */
1336 if (obd_uuid_equals(&exp->exp_client_uuid,
1337 &exp->exp_obd->obd_uuid))
1340 /* don't evict clients which have no slot in last_rcvd
1341 * (e.g. lightweight connection) */
1342 if (exp->exp_target_data.ted_lr_idx == -1)
1345 spin_lock(&exp->exp_lock);
1346 if (exp->exp_failed || test_export(exp)) {
1347 spin_unlock(&exp->exp_lock);
1350 exp->exp_failed = 1;
1351 spin_unlock(&exp->exp_lock);
1353 cfs_list_move(&exp->exp_obd_chain, &work_list);
1355 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1356 obd->obd_name, exp->exp_client_uuid.uuid,
1357 exp->exp_connection == NULL ? "<unknown>" :
1358 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1359 print_export_data(exp, "EVICTING", 0);
1361 spin_unlock(&obd->obd_dev_lock);
1364 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1365 obd->obd_name, evicted);
1367 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1368 OBD_OPT_ABORT_RECOV);
1371 EXPORT_SYMBOL(class_disconnect_stale_exports);
1373 void class_fail_export(struct obd_export *exp)
1375 int rc, already_failed;
1377 spin_lock(&exp->exp_lock);
1378 already_failed = exp->exp_failed;
1379 exp->exp_failed = 1;
1380 spin_unlock(&exp->exp_lock);
1382 if (already_failed) {
1383 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1384 exp, exp->exp_client_uuid.uuid);
1388 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1389 exp, exp->exp_client_uuid.uuid);
1391 if (obd_dump_on_timeout)
1392 libcfs_debug_dumplog();
1394 /* need for safe call CDEBUG after obd_disconnect */
1395 class_export_get(exp);
1397 /* Most callers into obd_disconnect are removing their own reference
1398 * (request, for example) in addition to the one from the hash table.
1399 * We don't have such a reference here, so make one. */
1400 class_export_get(exp);
1401 rc = obd_disconnect(exp);
1403 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1405 CDEBUG(D_HA, "disconnected export %p/%s\n",
1406 exp, exp->exp_client_uuid.uuid);
1407 class_export_put(exp);
1409 EXPORT_SYMBOL(class_fail_export);
1411 char *obd_export_nid2str(struct obd_export *exp)
1413 if (exp->exp_connection != NULL)
1414 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1418 EXPORT_SYMBOL(obd_export_nid2str);
1420 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1422 cfs_hash_t *nid_hash;
1423 struct obd_export *doomed_exp = NULL;
1424 int exports_evicted = 0;
1426 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1428 spin_lock(&obd->obd_dev_lock);
1429 /* umount has run already, so evict thread should leave
1430 * its task to umount thread now */
1431 if (obd->obd_stopping) {
1432 spin_unlock(&obd->obd_dev_lock);
1433 return exports_evicted;
1435 nid_hash = obd->obd_nid_hash;
1436 cfs_hash_getref(nid_hash);
1437 spin_unlock(&obd->obd_dev_lock);
1440 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1441 if (doomed_exp == NULL)
1444 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1445 "nid %s found, wanted nid %s, requested nid %s\n",
1446 obd_export_nid2str(doomed_exp),
1447 libcfs_nid2str(nid_key), nid);
1448 LASSERTF(doomed_exp != obd->obd_self_export,
1449 "self-export is hashed by NID?\n");
1451 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1452 "request\n", obd->obd_name,
1453 obd_uuid2str(&doomed_exp->exp_client_uuid),
1454 obd_export_nid2str(doomed_exp));
1455 class_fail_export(doomed_exp);
1456 class_export_put(doomed_exp);
1459 cfs_hash_putref(nid_hash);
1461 if (!exports_evicted)
1462 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1463 obd->obd_name, nid);
1464 return exports_evicted;
1466 EXPORT_SYMBOL(obd_export_evict_by_nid);
1468 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1470 cfs_hash_t *uuid_hash;
1471 struct obd_export *doomed_exp = NULL;
1472 struct obd_uuid doomed_uuid;
1473 int exports_evicted = 0;
1475 spin_lock(&obd->obd_dev_lock);
1476 if (obd->obd_stopping) {
1477 spin_unlock(&obd->obd_dev_lock);
1478 return exports_evicted;
1480 uuid_hash = obd->obd_uuid_hash;
1481 cfs_hash_getref(uuid_hash);
1482 spin_unlock(&obd->obd_dev_lock);
1484 obd_str2uuid(&doomed_uuid, uuid);
1485 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1486 CERROR("%s: can't evict myself\n", obd->obd_name);
1487 cfs_hash_putref(uuid_hash);
1488 return exports_evicted;
1491 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1493 if (doomed_exp == NULL) {
1494 CERROR("%s: can't disconnect %s: no exports found\n",
1495 obd->obd_name, uuid);
1497 CWARN("%s: evicting %s at adminstrative request\n",
1498 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1499 class_fail_export(doomed_exp);
1500 class_export_put(doomed_exp);
1503 cfs_hash_putref(uuid_hash);
1505 return exports_evicted;
1507 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1509 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1510 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1511 EXPORT_SYMBOL(class_export_dump_hook);
1514 static void print_export_data(struct obd_export *exp, const char *status,
1517 struct ptlrpc_reply_state *rs;
1518 struct ptlrpc_reply_state *first_reply = NULL;
1521 spin_lock(&exp->exp_lock);
1522 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1528 spin_unlock(&exp->exp_lock);
1530 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1531 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1532 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1533 cfs_atomic_read(&exp->exp_rpc_count),
1534 cfs_atomic_read(&exp->exp_cb_count),
1535 cfs_atomic_read(&exp->exp_locks_count),
1536 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1537 nreplies, first_reply, nreplies > 3 ? "..." : "",
1538 exp->exp_last_committed);
1539 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1540 if (locks && class_export_dump_hook != NULL)
1541 class_export_dump_hook(exp);
1545 void dump_exports(struct obd_device *obd, int locks)
1547 struct obd_export *exp;
1549 spin_lock(&obd->obd_dev_lock);
1550 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1551 print_export_data(exp, "ACTIVE", locks);
1552 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1553 print_export_data(exp, "UNLINKED", locks);
1554 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1555 print_export_data(exp, "DELAYED", locks);
1556 spin_unlock(&obd->obd_dev_lock);
1557 spin_lock(&obd_zombie_impexp_lock);
1558 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1559 print_export_data(exp, "ZOMBIE", locks);
1560 spin_unlock(&obd_zombie_impexp_lock);
1562 EXPORT_SYMBOL(dump_exports);
1564 void obd_exports_barrier(struct obd_device *obd)
1567 LASSERT(cfs_list_empty(&obd->obd_exports));
1568 spin_lock(&obd->obd_dev_lock);
1569 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1570 spin_unlock(&obd->obd_dev_lock);
1571 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1572 cfs_time_seconds(waited));
1573 if (waited > 5 && IS_PO2(waited)) {
1574 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1575 "more than %d seconds. "
1576 "The obd refcount = %d. Is it stuck?\n",
1577 obd->obd_name, waited,
1578 cfs_atomic_read(&obd->obd_refcount));
1579 dump_exports(obd, 1);
1582 spin_lock(&obd->obd_dev_lock);
1584 spin_unlock(&obd->obd_dev_lock);
1586 EXPORT_SYMBOL(obd_exports_barrier);
1588 /* Total amount of zombies to be destroyed */
1589 static int zombies_count = 0;
1592 * kill zombie imports and exports
1594 void obd_zombie_impexp_cull(void)
1596 struct obd_import *import;
1597 struct obd_export *export;
1601 spin_lock(&obd_zombie_impexp_lock);
1604 if (!cfs_list_empty(&obd_zombie_imports)) {
1605 import = cfs_list_entry(obd_zombie_imports.next,
1608 cfs_list_del_init(&import->imp_zombie_chain);
1612 if (!cfs_list_empty(&obd_zombie_exports)) {
1613 export = cfs_list_entry(obd_zombie_exports.next,
1616 cfs_list_del_init(&export->exp_obd_chain);
1619 spin_unlock(&obd_zombie_impexp_lock);
1621 if (import != NULL) {
1622 class_import_destroy(import);
1623 spin_lock(&obd_zombie_impexp_lock);
1625 spin_unlock(&obd_zombie_impexp_lock);
1628 if (export != NULL) {
1629 class_export_destroy(export);
1630 spin_lock(&obd_zombie_impexp_lock);
1632 spin_unlock(&obd_zombie_impexp_lock);
1636 } while (import != NULL || export != NULL);
1640 static struct completion obd_zombie_start;
1641 static struct completion obd_zombie_stop;
1642 static unsigned long obd_zombie_flags;
1643 static wait_queue_head_t obd_zombie_waitq;
1644 static pid_t obd_zombie_pid;
1647 OBD_ZOMBIE_STOP = 0x0001,
1651 * check for work for kill zombie import/export thread.
1653 static int obd_zombie_impexp_check(void *arg)
1657 spin_lock(&obd_zombie_impexp_lock);
1658 rc = (zombies_count == 0) &&
1659 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1660 spin_unlock(&obd_zombie_impexp_lock);
1666 * Add export to the obd_zombe thread and notify it.
1668 static void obd_zombie_export_add(struct obd_export *exp) {
1669 spin_lock(&exp->exp_obd->obd_dev_lock);
1670 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1671 cfs_list_del_init(&exp->exp_obd_chain);
1672 spin_unlock(&exp->exp_obd->obd_dev_lock);
1673 spin_lock(&obd_zombie_impexp_lock);
1675 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1676 spin_unlock(&obd_zombie_impexp_lock);
1678 obd_zombie_impexp_notify();
1682 * Add import to the obd_zombe thread and notify it.
1684 static void obd_zombie_import_add(struct obd_import *imp) {
1685 LASSERT(imp->imp_sec == NULL);
1686 LASSERT(imp->imp_rq_pool == NULL);
1687 spin_lock(&obd_zombie_impexp_lock);
1688 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1690 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1691 spin_unlock(&obd_zombie_impexp_lock);
1693 obd_zombie_impexp_notify();
1697 * notify import/export destroy thread about new zombie.
1699 static void obd_zombie_impexp_notify(void)
1702 * Make sure obd_zomebie_impexp_thread get this notification.
1703 * It is possible this signal only get by obd_zombie_barrier, and
1704 * barrier gulps this notification and sleeps away and hangs ensues
1706 wake_up_all(&obd_zombie_waitq);
1710 * check whether obd_zombie is idle
1712 static int obd_zombie_is_idle(void)
1716 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1717 spin_lock(&obd_zombie_impexp_lock);
1718 rc = (zombies_count == 0);
1719 spin_unlock(&obd_zombie_impexp_lock);
1724 * wait when obd_zombie import/export queues become empty
1726 void obd_zombie_barrier(void)
1728 struct l_wait_info lwi = { 0 };
1730 if (obd_zombie_pid == current_pid())
1731 /* don't wait for myself */
1733 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1735 EXPORT_SYMBOL(obd_zombie_barrier);
1740 * destroy zombie export/import thread.
1742 static int obd_zombie_impexp_thread(void *unused)
1744 unshare_fs_struct();
1745 complete(&obd_zombie_start);
1747 obd_zombie_pid = current_pid();
1749 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1750 struct l_wait_info lwi = { 0 };
1752 l_wait_event(obd_zombie_waitq,
1753 !obd_zombie_impexp_check(NULL), &lwi);
1754 obd_zombie_impexp_cull();
1757 * Notify obd_zombie_barrier callers that queues
1760 wake_up(&obd_zombie_waitq);
1763 complete(&obd_zombie_stop);
1768 #else /* ! KERNEL */
1770 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1771 static void *obd_zombie_impexp_work_cb;
1772 static void *obd_zombie_impexp_idle_cb;
1774 int obd_zombie_impexp_kill(void *arg)
1778 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1779 obd_zombie_impexp_cull();
1782 cfs_atomic_dec(&zombie_recur);
1789 * start destroy zombie import/export thread
1791 int obd_zombie_impexp_init(void)
1794 struct task_struct *task;
1797 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1798 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1799 spin_lock_init(&obd_zombie_impexp_lock);
1800 init_completion(&obd_zombie_start);
1801 init_completion(&obd_zombie_stop);
1802 init_waitqueue_head(&obd_zombie_waitq);
1806 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1808 RETURN(PTR_ERR(task));
1810 wait_for_completion(&obd_zombie_start);
1813 obd_zombie_impexp_work_cb =
1814 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1815 &obd_zombie_impexp_kill, NULL);
1817 obd_zombie_impexp_idle_cb =
1818 liblustre_register_idle_callback("obd_zombi_impexp_check",
1819 &obd_zombie_impexp_check, NULL);
1824 * stop destroy zombie import/export thread
1826 void obd_zombie_impexp_stop(void)
1828 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1829 obd_zombie_impexp_notify();
1831 wait_for_completion(&obd_zombie_stop);
1833 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1834 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1838 /***** Kernel-userspace comm helpers *******/
1840 /* Get length of entire message, including header */
1841 int kuc_len(int payload_len)
1843 return sizeof(struct kuc_hdr) + payload_len;
1845 EXPORT_SYMBOL(kuc_len);
1847 /* Get a pointer to kuc header, given a ptr to the payload
1848 * @param p Pointer to payload area
1849 * @returns Pointer to kuc header
1851 struct kuc_hdr * kuc_ptr(void *p)
1853 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1854 LASSERT(lh->kuc_magic == KUC_MAGIC);
1857 EXPORT_SYMBOL(kuc_ptr);
1859 /* Test if payload is part of kuc message
1860 * @param p Pointer to payload area
1863 int kuc_ispayload(void *p)
1865 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1867 if (kh->kuc_magic == KUC_MAGIC)
1872 EXPORT_SYMBOL(kuc_ispayload);
1874 /* Alloc space for a message, and fill in header
1875 * @return Pointer to payload area
1877 void *kuc_alloc(int payload_len, int transport, int type)
1880 int len = kuc_len(payload_len);
1884 return ERR_PTR(-ENOMEM);
1886 lh->kuc_magic = KUC_MAGIC;
1887 lh->kuc_transport = transport;
1888 lh->kuc_msgtype = type;
1889 lh->kuc_msglen = len;
1891 return (void *)(lh + 1);
1893 EXPORT_SYMBOL(kuc_alloc);
1895 /* Takes pointer to payload area */
1896 inline void kuc_free(void *p, int payload_len)
1898 struct kuc_hdr *lh = kuc_ptr(p);
1899 OBD_FREE(lh, kuc_len(payload_len));
1901 EXPORT_SYMBOL(kuc_free);