4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
53 struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 struct kmem_cache *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, __GFP_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
151 EXPORT_SYMBOL(class_get_type);
153 void class_put_type(struct obd_type *type)
156 spin_lock(&type->obd_type_lock);
158 module_put(type->typ_dt_ops->o_owner);
159 spin_unlock(&type->obd_type_lock);
161 EXPORT_SYMBOL(class_put_type);
163 #define CLASS_MAX_NAME 1024
165 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
166 struct lprocfs_vars *vars, const char *name,
167 struct lu_device_type *ldt)
169 struct obd_type *type;
174 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
176 if (class_search_type(name)) {
177 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
182 OBD_ALLOC(type, sizeof(*type));
186 OBD_ALLOC_PTR(type->typ_dt_ops);
187 OBD_ALLOC_PTR(type->typ_md_ops);
188 OBD_ALLOC(type->typ_name, strlen(name) + 1);
190 if (type->typ_dt_ops == NULL ||
191 type->typ_md_ops == NULL ||
192 type->typ_name == NULL)
195 *(type->typ_dt_ops) = *dt_ops;
196 /* md_ops is optional */
198 *(type->typ_md_ops) = *md_ops;
199 strcpy(type->typ_name, name);
200 spin_lock_init(&type->obd_type_lock);
203 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
205 if (IS_ERR(type->typ_procroot)) {
206 rc = PTR_ERR(type->typ_procroot);
207 type->typ_procroot = NULL;
213 rc = lu_device_type_init(ldt);
218 spin_lock(&obd_types_lock);
219 cfs_list_add(&type->typ_chain, &obd_types);
220 spin_unlock(&obd_types_lock);
225 if (type->typ_name != NULL)
226 OBD_FREE(type->typ_name, strlen(name) + 1);
227 if (type->typ_md_ops != NULL)
228 OBD_FREE_PTR(type->typ_md_ops);
229 if (type->typ_dt_ops != NULL)
230 OBD_FREE_PTR(type->typ_dt_ops);
231 OBD_FREE(type, sizeof(*type));
234 EXPORT_SYMBOL(class_register_type);
236 int class_unregister_type(const char *name)
238 struct obd_type *type = class_search_type(name);
242 CERROR("unknown obd type\n");
246 if (type->typ_refcnt) {
247 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
248 /* This is a bad situation, let's make the best of it */
249 /* Remove ops, but leave the name for debugging */
250 OBD_FREE_PTR(type->typ_dt_ops);
251 OBD_FREE_PTR(type->typ_md_ops);
255 /* we do not use type->typ_procroot as for compatibility purposes
256 * other modules can share names (i.e. lod can use lov entry). so
257 * we can't reference pointer as it can get invalided when another
258 * module removes the entry */
259 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
262 lu_device_type_fini(type->typ_lu);
264 spin_lock(&obd_types_lock);
265 cfs_list_del(&type->typ_chain);
266 spin_unlock(&obd_types_lock);
267 OBD_FREE(type->typ_name, strlen(name) + 1);
268 if (type->typ_dt_ops != NULL)
269 OBD_FREE_PTR(type->typ_dt_ops);
270 if (type->typ_md_ops != NULL)
271 OBD_FREE_PTR(type->typ_md_ops);
272 OBD_FREE(type, sizeof(*type));
274 } /* class_unregister_type */
275 EXPORT_SYMBOL(class_unregister_type);
278 * Create a new obd device.
280 * Find an empty slot in ::obd_devs[], create a new obd device in it.
282 * \param[in] type_name obd device type string.
283 * \param[in] name obd device name.
285 * \retval NULL if create fails, otherwise return the obd device
288 struct obd_device *class_newdev(const char *type_name, const char *name)
290 struct obd_device *result = NULL;
291 struct obd_device *newdev;
292 struct obd_type *type = NULL;
294 int new_obd_minor = 0;
297 if (strlen(name) >= MAX_OBD_NAME) {
298 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
299 RETURN(ERR_PTR(-EINVAL));
302 type = class_get_type(type_name);
304 CERROR("OBD: unknown type: %s\n", type_name);
305 RETURN(ERR_PTR(-ENODEV));
308 newdev = obd_device_alloc();
310 GOTO(out_type, result = ERR_PTR(-ENOMEM));
312 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
314 write_lock(&obd_dev_lock);
315 for (i = 0; i < class_devno_max(); i++) {
316 struct obd_device *obd = class_num2obd(i);
318 if (obd && (strcmp(name, obd->obd_name) == 0)) {
319 CERROR("Device %s already exists at %d, won't add\n",
322 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
323 "%p obd_magic %08x != %08x\n", result,
324 result->obd_magic, OBD_DEVICE_MAGIC);
325 LASSERTF(result->obd_minor == new_obd_minor,
326 "%p obd_minor %d != %d\n", result,
327 result->obd_minor, new_obd_minor);
329 obd_devs[result->obd_minor] = NULL;
330 result->obd_name[0]='\0';
332 result = ERR_PTR(-EEXIST);
335 if (!result && !obd) {
337 result->obd_minor = i;
339 result->obd_type = type;
340 strncpy(result->obd_name, name,
341 sizeof(result->obd_name) - 1);
342 obd_devs[i] = result;
345 write_unlock(&obd_dev_lock);
347 if (result == NULL && i >= class_devno_max()) {
348 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
350 GOTO(out, result = ERR_PTR(-EOVERFLOW));
356 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
357 result->obd_name, result);
361 obd_device_free(newdev);
363 class_put_type(type);
367 void class_release_dev(struct obd_device *obd)
369 struct obd_type *obd_type = obd->obd_type;
371 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
372 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
373 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
374 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
375 LASSERT(obd_type != NULL);
377 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
378 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
380 write_lock(&obd_dev_lock);
381 obd_devs[obd->obd_minor] = NULL;
382 write_unlock(&obd_dev_lock);
383 obd_device_free(obd);
385 class_put_type(obd_type);
388 int class_name2dev(const char *name)
395 read_lock(&obd_dev_lock);
396 for (i = 0; i < class_devno_max(); i++) {
397 struct obd_device *obd = class_num2obd(i);
399 if (obd && strcmp(name, obd->obd_name) == 0) {
400 /* Make sure we finished attaching before we give
401 out any references */
402 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
403 if (obd->obd_attached) {
404 read_unlock(&obd_dev_lock);
410 read_unlock(&obd_dev_lock);
414 EXPORT_SYMBOL(class_name2dev);
416 struct obd_device *class_name2obd(const char *name)
418 int dev = class_name2dev(name);
420 if (dev < 0 || dev > class_devno_max())
422 return class_num2obd(dev);
424 EXPORT_SYMBOL(class_name2obd);
426 int class_uuid2dev(struct obd_uuid *uuid)
430 read_lock(&obd_dev_lock);
431 for (i = 0; i < class_devno_max(); i++) {
432 struct obd_device *obd = class_num2obd(i);
434 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
435 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
436 read_unlock(&obd_dev_lock);
440 read_unlock(&obd_dev_lock);
444 EXPORT_SYMBOL(class_uuid2dev);
446 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
448 int dev = class_uuid2dev(uuid);
451 return class_num2obd(dev);
453 EXPORT_SYMBOL(class_uuid2obd);
456 * Get obd device from ::obd_devs[]
458 * \param num [in] array index
460 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
461 * otherwise return the obd device there.
463 struct obd_device *class_num2obd(int num)
465 struct obd_device *obd = NULL;
467 if (num < class_devno_max()) {
472 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
473 "%p obd_magic %08x != %08x\n",
474 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
475 LASSERTF(obd->obd_minor == num,
476 "%p obd_minor %0d != %0d\n",
477 obd, obd->obd_minor, num);
482 EXPORT_SYMBOL(class_num2obd);
485 * Get obd devices count. Device in any
487 * \retval obd device count
489 int get_devices_count(void)
491 int index, max_index = class_devno_max(), dev_count = 0;
493 read_lock(&obd_dev_lock);
494 for (index = 0; index <= max_index; index++) {
495 struct obd_device *obd = class_num2obd(index);
499 read_unlock(&obd_dev_lock);
503 EXPORT_SYMBOL(get_devices_count);
505 void class_obd_list(void)
510 read_lock(&obd_dev_lock);
511 for (i = 0; i < class_devno_max(); i++) {
512 struct obd_device *obd = class_num2obd(i);
516 if (obd->obd_stopping)
518 else if (obd->obd_set_up)
520 else if (obd->obd_attached)
524 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
525 i, status, obd->obd_type->typ_name,
526 obd->obd_name, obd->obd_uuid.uuid,
527 cfs_atomic_read(&obd->obd_refcount));
529 read_unlock(&obd_dev_lock);
533 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
534 specified, then only the client with that uuid is returned,
535 otherwise any client connected to the tgt is returned. */
536 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
537 const char * typ_name,
538 struct obd_uuid *grp_uuid)
542 read_lock(&obd_dev_lock);
543 for (i = 0; i < class_devno_max(); i++) {
544 struct obd_device *obd = class_num2obd(i);
548 if ((strncmp(obd->obd_type->typ_name, typ_name,
549 strlen(typ_name)) == 0)) {
550 if (obd_uuid_equals(tgt_uuid,
551 &obd->u.cli.cl_target_uuid) &&
552 ((grp_uuid)? obd_uuid_equals(grp_uuid,
553 &obd->obd_uuid) : 1)) {
554 read_unlock(&obd_dev_lock);
559 read_unlock(&obd_dev_lock);
563 EXPORT_SYMBOL(class_find_client_obd);
565 /* Iterate the obd_device list looking devices have grp_uuid. Start
566 searching at *next, and if a device is found, the next index to look
567 at is saved in *next. If next is NULL, then the first matching device
568 will always be returned. */
569 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
575 else if (*next >= 0 && *next < class_devno_max())
580 read_lock(&obd_dev_lock);
581 for (; i < class_devno_max(); i++) {
582 struct obd_device *obd = class_num2obd(i);
586 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
589 read_unlock(&obd_dev_lock);
593 read_unlock(&obd_dev_lock);
597 EXPORT_SYMBOL(class_devices_in_group);
600 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
601 * adjust sptlrpc settings accordingly.
603 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
605 struct obd_device *obd;
609 LASSERT(namelen > 0);
611 read_lock(&obd_dev_lock);
612 for (i = 0; i < class_devno_max(); i++) {
613 obd = class_num2obd(i);
615 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
618 /* only notify mdc, osc, mdt, ost */
619 type = obd->obd_type->typ_name;
620 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
621 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
622 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
623 strcmp(type, LUSTRE_OST_NAME) != 0)
626 if (strncmp(obd->obd_name, fsname, namelen))
629 class_incref(obd, __FUNCTION__, obd);
630 read_unlock(&obd_dev_lock);
631 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
632 sizeof(KEY_SPTLRPC_CONF),
633 KEY_SPTLRPC_CONF, 0, NULL, NULL);
635 class_decref(obd, __FUNCTION__, obd);
636 read_lock(&obd_dev_lock);
638 read_unlock(&obd_dev_lock);
641 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
643 void obd_cleanup_caches(void)
646 if (obd_device_cachep) {
647 kmem_cache_destroy(obd_device_cachep);
648 obd_device_cachep = NULL;
651 kmem_cache_destroy(obdo_cachep);
655 kmem_cache_destroy(import_cachep);
656 import_cachep = NULL;
659 kmem_cache_destroy(capa_cachep);
665 int obd_init_caches(void)
669 LASSERT(obd_device_cachep == NULL);
670 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
671 sizeof(struct obd_device),
673 if (!obd_device_cachep)
676 LASSERT(obdo_cachep == NULL);
677 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
682 LASSERT(import_cachep == NULL);
683 import_cachep = kmem_cache_create("ll_import_cache",
684 sizeof(struct obd_import),
689 LASSERT(capa_cachep == NULL);
690 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
697 obd_cleanup_caches();
702 /* map connection to client */
703 struct obd_export *class_conn2export(struct lustre_handle *conn)
705 struct obd_export *export;
709 CDEBUG(D_CACHE, "looking for null handle\n");
713 if (conn->cookie == -1) { /* this means assign a new connection */
714 CDEBUG(D_CACHE, "want a new connection\n");
718 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
719 export = class_handle2object(conn->cookie, NULL);
722 EXPORT_SYMBOL(class_conn2export);
724 struct obd_device *class_exp2obd(struct obd_export *exp)
730 EXPORT_SYMBOL(class_exp2obd);
732 struct obd_device *class_conn2obd(struct lustre_handle *conn)
734 struct obd_export *export;
735 export = class_conn2export(conn);
737 struct obd_device *obd = export->exp_obd;
738 class_export_put(export);
743 EXPORT_SYMBOL(class_conn2obd);
745 struct obd_import *class_exp2cliimp(struct obd_export *exp)
747 struct obd_device *obd = exp->exp_obd;
750 return obd->u.cli.cl_import;
752 EXPORT_SYMBOL(class_exp2cliimp);
754 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
756 struct obd_device *obd = class_conn2obd(conn);
759 return obd->u.cli.cl_import;
761 EXPORT_SYMBOL(class_conn2cliimp);
763 /* Export management functions */
764 static void class_export_destroy(struct obd_export *exp)
766 struct obd_device *obd = exp->exp_obd;
769 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
770 LASSERT(obd != NULL);
772 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
773 exp->exp_client_uuid.uuid, obd->obd_name);
775 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
776 if (exp->exp_connection)
777 ptlrpc_put_connection_superhack(exp->exp_connection);
779 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
780 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
781 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
782 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
783 obd_destroy_export(exp);
784 class_decref(obd, "export", exp);
786 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
790 static void export_handle_addref(void *export)
792 class_export_get(export);
795 static struct portals_handle_ops export_handle_ops = {
796 .hop_addref = export_handle_addref,
800 struct obd_export *class_export_get(struct obd_export *exp)
802 cfs_atomic_inc(&exp->exp_refcount);
803 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
804 cfs_atomic_read(&exp->exp_refcount));
807 EXPORT_SYMBOL(class_export_get);
809 void class_export_put(struct obd_export *exp)
811 LASSERT(exp != NULL);
812 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
813 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
814 cfs_atomic_read(&exp->exp_refcount) - 1);
816 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
817 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
818 CDEBUG(D_IOCTL, "final put %p/%s\n",
819 exp, exp->exp_client_uuid.uuid);
821 /* release nid stat refererence */
822 lprocfs_exp_cleanup(exp);
824 obd_zombie_export_add(exp);
827 EXPORT_SYMBOL(class_export_put);
829 /* Creates a new export, adds it to the hash table, and returns a
830 * pointer to it. The refcount is 2: one for the hash reference, and
831 * one for the pointer returned by this function. */
832 struct obd_export *class_new_export(struct obd_device *obd,
833 struct obd_uuid *cluuid)
835 struct obd_export *export;
836 cfs_hash_t *hash = NULL;
840 OBD_ALLOC_PTR(export);
842 return ERR_PTR(-ENOMEM);
844 export->exp_conn_cnt = 0;
845 export->exp_lock_hash = NULL;
846 export->exp_flock_hash = NULL;
847 cfs_atomic_set(&export->exp_refcount, 2);
848 cfs_atomic_set(&export->exp_rpc_count, 0);
849 cfs_atomic_set(&export->exp_cb_count, 0);
850 cfs_atomic_set(&export->exp_locks_count, 0);
851 #if LUSTRE_TRACKS_LOCK_EXP_REFS
852 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
853 spin_lock_init(&export->exp_locks_list_guard);
855 cfs_atomic_set(&export->exp_replay_count, 0);
856 export->exp_obd = obd;
857 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
858 spin_lock_init(&export->exp_uncommitted_replies_lock);
859 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
860 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
861 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
862 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
863 class_handle_hash(&export->exp_handle, &export_handle_ops);
864 export->exp_last_request_time = cfs_time_current_sec();
865 spin_lock_init(&export->exp_lock);
866 spin_lock_init(&export->exp_rpc_lock);
867 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
868 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
869 spin_lock_init(&export->exp_bl_list_lock);
870 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
872 export->exp_sp_peer = LUSTRE_SP_ANY;
873 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
874 export->exp_client_uuid = *cluuid;
875 obd_init_export(export);
877 spin_lock(&obd->obd_dev_lock);
878 /* shouldn't happen, but might race */
879 if (obd->obd_stopping)
880 GOTO(exit_unlock, rc = -ENODEV);
882 hash = cfs_hash_getref(obd->obd_uuid_hash);
884 GOTO(exit_unlock, rc = -ENODEV);
885 spin_unlock(&obd->obd_dev_lock);
887 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
888 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
890 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
891 obd->obd_name, cluuid->uuid, rc);
892 GOTO(exit_err, rc = -EALREADY);
896 spin_lock(&obd->obd_dev_lock);
897 if (obd->obd_stopping) {
898 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
899 GOTO(exit_unlock, rc = -ENODEV);
902 class_incref(obd, "export", export);
903 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
904 cfs_list_add_tail(&export->exp_obd_chain_timed,
905 &export->exp_obd->obd_exports_timed);
906 export->exp_obd->obd_num_exports++;
907 spin_unlock(&obd->obd_dev_lock);
908 cfs_hash_putref(hash);
912 spin_unlock(&obd->obd_dev_lock);
915 cfs_hash_putref(hash);
916 class_handle_unhash(&export->exp_handle);
917 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
918 obd_destroy_export(export);
919 OBD_FREE_PTR(export);
922 EXPORT_SYMBOL(class_new_export);
924 void class_unlink_export(struct obd_export *exp)
926 class_handle_unhash(&exp->exp_handle);
928 spin_lock(&exp->exp_obd->obd_dev_lock);
929 /* delete an uuid-export hashitem from hashtables */
930 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
931 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
932 &exp->exp_client_uuid,
933 &exp->exp_uuid_hash);
935 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
936 cfs_list_del_init(&exp->exp_obd_chain_timed);
937 exp->exp_obd->obd_num_exports--;
938 spin_unlock(&exp->exp_obd->obd_dev_lock);
939 class_export_put(exp);
941 EXPORT_SYMBOL(class_unlink_export);
943 /* Import management functions */
944 void class_import_destroy(struct obd_import *imp)
948 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
949 imp->imp_obd->obd_name);
951 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
953 ptlrpc_put_connection_superhack(imp->imp_connection);
955 while (!cfs_list_empty(&imp->imp_conn_list)) {
956 struct obd_import_conn *imp_conn;
958 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
959 struct obd_import_conn, oic_item);
960 cfs_list_del_init(&imp_conn->oic_item);
961 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
962 OBD_FREE(imp_conn, sizeof(*imp_conn));
965 LASSERT(imp->imp_sec == NULL);
966 class_decref(imp->imp_obd, "import", imp);
967 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
971 static void import_handle_addref(void *import)
973 class_import_get(import);
976 static struct portals_handle_ops import_handle_ops = {
977 .hop_addref = import_handle_addref,
981 struct obd_import *class_import_get(struct obd_import *import)
983 cfs_atomic_inc(&import->imp_refcount);
984 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
985 cfs_atomic_read(&import->imp_refcount),
986 import->imp_obd->obd_name);
989 EXPORT_SYMBOL(class_import_get);
991 void class_import_put(struct obd_import *imp)
995 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
996 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
998 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
999 cfs_atomic_read(&imp->imp_refcount) - 1,
1000 imp->imp_obd->obd_name);
1002 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1003 CDEBUG(D_INFO, "final put import %p\n", imp);
1004 obd_zombie_import_add(imp);
1007 /* catch possible import put race */
1008 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1011 EXPORT_SYMBOL(class_import_put);
1013 static void init_imp_at(struct imp_at *at) {
1015 at_init(&at->iat_net_latency, 0, 0);
1016 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1017 /* max service estimates are tracked on the server side, so
1018 don't use the AT history here, just use the last reported
1019 val. (But keep hist for proc histogram, worst_ever) */
1020 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1025 struct obd_import *class_new_import(struct obd_device *obd)
1027 struct obd_import *imp;
1029 OBD_ALLOC(imp, sizeof(*imp));
1033 CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1034 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1035 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1036 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1037 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1038 CFS_INIT_LIST_HEAD(&imp->imp_committed_list);
1039 imp->imp_replay_cursor = &imp->imp_committed_list;
1040 spin_lock_init(&imp->imp_lock);
1041 imp->imp_last_success_conn = 0;
1042 imp->imp_state = LUSTRE_IMP_NEW;
1043 imp->imp_obd = class_incref(obd, "import", imp);
1044 mutex_init(&imp->imp_sec_mutex);
1045 init_waitqueue_head(&imp->imp_recovery_waitq);
1047 cfs_atomic_set(&imp->imp_refcount, 2);
1048 cfs_atomic_set(&imp->imp_unregistering, 0);
1049 cfs_atomic_set(&imp->imp_inflight, 0);
1050 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1051 cfs_atomic_set(&imp->imp_inval_count, 0);
1052 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1053 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1054 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1055 init_imp_at(&imp->imp_at);
1057 /* the default magic is V2, will be used in connect RPC, and
1058 * then adjusted according to the flags in request/reply. */
1059 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1063 EXPORT_SYMBOL(class_new_import);
1065 void class_destroy_import(struct obd_import *import)
1067 LASSERT(import != NULL);
1068 LASSERT(import != LP_POISON);
1070 class_handle_unhash(&import->imp_handle);
1072 spin_lock(&import->imp_lock);
1073 import->imp_generation++;
1074 spin_unlock(&import->imp_lock);
1075 class_import_put(import);
1077 EXPORT_SYMBOL(class_destroy_import);
1079 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1081 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1083 spin_lock(&exp->exp_locks_list_guard);
1085 LASSERT(lock->l_exp_refs_nr >= 0);
1087 if (lock->l_exp_refs_target != NULL &&
1088 lock->l_exp_refs_target != exp) {
1089 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1090 exp, lock, lock->l_exp_refs_target);
1092 if ((lock->l_exp_refs_nr ++) == 0) {
1093 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1094 lock->l_exp_refs_target = exp;
1096 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1097 lock, exp, lock->l_exp_refs_nr);
1098 spin_unlock(&exp->exp_locks_list_guard);
1100 EXPORT_SYMBOL(__class_export_add_lock_ref);
1102 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1104 spin_lock(&exp->exp_locks_list_guard);
1105 LASSERT(lock->l_exp_refs_nr > 0);
1106 if (lock->l_exp_refs_target != exp) {
1107 LCONSOLE_WARN("lock %p, "
1108 "mismatching export pointers: %p, %p\n",
1109 lock, lock->l_exp_refs_target, exp);
1111 if (-- lock->l_exp_refs_nr == 0) {
1112 cfs_list_del_init(&lock->l_exp_refs_link);
1113 lock->l_exp_refs_target = NULL;
1115 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1116 lock, exp, lock->l_exp_refs_nr);
1117 spin_unlock(&exp->exp_locks_list_guard);
1119 EXPORT_SYMBOL(__class_export_del_lock_ref);
1122 /* A connection defines an export context in which preallocation can
1123 be managed. This releases the export pointer reference, and returns
1124 the export handle, so the export refcount is 1 when this function
1126 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1127 struct obd_uuid *cluuid)
1129 struct obd_export *export;
1130 LASSERT(conn != NULL);
1131 LASSERT(obd != NULL);
1132 LASSERT(cluuid != NULL);
1135 export = class_new_export(obd, cluuid);
1137 RETURN(PTR_ERR(export));
1139 conn->cookie = export->exp_handle.h_cookie;
1140 class_export_put(export);
1142 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1143 cluuid->uuid, conn->cookie);
1146 EXPORT_SYMBOL(class_connect);
1148 /* if export is involved in recovery then clean up related things */
1149 void class_export_recovery_cleanup(struct obd_export *exp)
1151 struct obd_device *obd = exp->exp_obd;
1153 spin_lock(&obd->obd_recovery_task_lock);
1154 if (exp->exp_delayed)
1155 obd->obd_delayed_clients--;
1156 if (obd->obd_recovering) {
1157 if (exp->exp_in_recovery) {
1158 spin_lock(&exp->exp_lock);
1159 exp->exp_in_recovery = 0;
1160 spin_unlock(&exp->exp_lock);
1161 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1162 cfs_atomic_dec(&obd->obd_connected_clients);
1165 /* if called during recovery then should update
1166 * obd_stale_clients counter,
1167 * lightweight exports are not counted */
1168 if (exp->exp_failed &&
1169 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1170 exp->exp_obd->obd_stale_clients++;
1172 spin_unlock(&obd->obd_recovery_task_lock);
1173 /** Cleanup req replay fields */
1174 if (exp->exp_req_replay_needed) {
1175 spin_lock(&exp->exp_lock);
1176 exp->exp_req_replay_needed = 0;
1177 spin_unlock(&exp->exp_lock);
1178 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1179 cfs_atomic_dec(&obd->obd_req_replay_clients);
1181 /** Cleanup lock replay data */
1182 if (exp->exp_lock_replay_needed) {
1183 spin_lock(&exp->exp_lock);
1184 exp->exp_lock_replay_needed = 0;
1185 spin_unlock(&exp->exp_lock);
1186 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1187 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1191 /* This function removes 1-3 references from the export:
1192 * 1 - for export pointer passed
1193 * and if disconnect really need
1194 * 2 - removing from hash
1195 * 3 - in client_unlink_export
1196 * The export pointer passed to this function can destroyed */
1197 int class_disconnect(struct obd_export *export)
1199 int already_disconnected;
1202 if (export == NULL) {
1203 CWARN("attempting to free NULL export %p\n", export);
1207 spin_lock(&export->exp_lock);
1208 already_disconnected = export->exp_disconnected;
1209 export->exp_disconnected = 1;
1210 spin_unlock(&export->exp_lock);
1212 /* class_cleanup(), abort_recovery(), and class_fail_export()
1213 * all end up in here, and if any of them race we shouldn't
1214 * call extra class_export_puts(). */
1215 if (already_disconnected) {
1216 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1217 GOTO(no_disconn, already_disconnected);
1220 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1221 export->exp_handle.h_cookie);
1223 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1224 cfs_hash_del(export->exp_obd->obd_nid_hash,
1225 &export->exp_connection->c_peer.nid,
1226 &export->exp_nid_hash);
1228 class_export_recovery_cleanup(export);
1229 class_unlink_export(export);
1231 class_export_put(export);
1234 EXPORT_SYMBOL(class_disconnect);
1236 /* Return non-zero for a fully connected export */
1237 int class_connected_export(struct obd_export *exp)
1242 spin_lock(&exp->exp_lock);
1243 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1244 spin_unlock(&exp->exp_lock);
1248 EXPORT_SYMBOL(class_connected_export);
1250 static void class_disconnect_export_list(cfs_list_t *list,
1251 enum obd_option flags)
1254 struct obd_export *exp;
1257 /* It's possible that an export may disconnect itself, but
1258 * nothing else will be added to this list. */
1259 while (!cfs_list_empty(list)) {
1260 exp = cfs_list_entry(list->next, struct obd_export,
1262 /* need for safe call CDEBUG after obd_disconnect */
1263 class_export_get(exp);
1265 spin_lock(&exp->exp_lock);
1266 exp->exp_flags = flags;
1267 spin_unlock(&exp->exp_lock);
1269 if (obd_uuid_equals(&exp->exp_client_uuid,
1270 &exp->exp_obd->obd_uuid)) {
1272 "exp %p export uuid == obd uuid, don't discon\n",
1274 /* Need to delete this now so we don't end up pointing
1275 * to work_list later when this export is cleaned up. */
1276 cfs_list_del_init(&exp->exp_obd_chain);
1277 class_export_put(exp);
1281 class_export_get(exp);
1282 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1283 "last request at "CFS_TIME_T"\n",
1284 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1285 exp, exp->exp_last_request_time);
1286 /* release one export reference anyway */
1287 rc = obd_disconnect(exp);
1289 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1290 obd_export_nid2str(exp), exp, rc);
1291 class_export_put(exp);
1296 void class_disconnect_exports(struct obd_device *obd)
1298 cfs_list_t work_list;
1301 /* Move all of the exports from obd_exports to a work list, en masse. */
1302 CFS_INIT_LIST_HEAD(&work_list);
1303 spin_lock(&obd->obd_dev_lock);
1304 cfs_list_splice_init(&obd->obd_exports, &work_list);
1305 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1306 spin_unlock(&obd->obd_dev_lock);
1308 if (!cfs_list_empty(&work_list)) {
1309 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1310 "disconnecting them\n", obd->obd_minor, obd);
1311 class_disconnect_export_list(&work_list,
1312 exp_flags_from_obd(obd));
1314 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1315 obd->obd_minor, obd);
1318 EXPORT_SYMBOL(class_disconnect_exports);
1320 /* Remove exports that have not completed recovery.
1322 void class_disconnect_stale_exports(struct obd_device *obd,
1323 int (*test_export)(struct obd_export *))
1325 cfs_list_t work_list;
1326 struct obd_export *exp, *n;
1330 CFS_INIT_LIST_HEAD(&work_list);
1331 spin_lock(&obd->obd_dev_lock);
1332 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1334 /* don't count self-export as client */
1335 if (obd_uuid_equals(&exp->exp_client_uuid,
1336 &exp->exp_obd->obd_uuid))
1339 /* don't evict clients which have no slot in last_rcvd
1340 * (e.g. lightweight connection) */
1341 if (exp->exp_target_data.ted_lr_idx == -1)
1344 spin_lock(&exp->exp_lock);
1345 if (exp->exp_failed || test_export(exp)) {
1346 spin_unlock(&exp->exp_lock);
1349 exp->exp_failed = 1;
1350 spin_unlock(&exp->exp_lock);
1352 cfs_list_move(&exp->exp_obd_chain, &work_list);
1354 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1355 obd->obd_name, exp->exp_client_uuid.uuid,
1356 exp->exp_connection == NULL ? "<unknown>" :
1357 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1358 print_export_data(exp, "EVICTING", 0);
1360 spin_unlock(&obd->obd_dev_lock);
1363 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1364 obd->obd_name, evicted);
1366 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1367 OBD_OPT_ABORT_RECOV);
1370 EXPORT_SYMBOL(class_disconnect_stale_exports);
1372 void class_fail_export(struct obd_export *exp)
1374 int rc, already_failed;
1376 spin_lock(&exp->exp_lock);
1377 already_failed = exp->exp_failed;
1378 exp->exp_failed = 1;
1379 spin_unlock(&exp->exp_lock);
1381 if (already_failed) {
1382 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1383 exp, exp->exp_client_uuid.uuid);
1387 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1388 exp, exp->exp_client_uuid.uuid);
1390 if (obd_dump_on_timeout)
1391 libcfs_debug_dumplog();
1393 /* need for safe call CDEBUG after obd_disconnect */
1394 class_export_get(exp);
1396 /* Most callers into obd_disconnect are removing their own reference
1397 * (request, for example) in addition to the one from the hash table.
1398 * We don't have such a reference here, so make one. */
1399 class_export_get(exp);
1400 rc = obd_disconnect(exp);
1402 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1404 CDEBUG(D_HA, "disconnected export %p/%s\n",
1405 exp, exp->exp_client_uuid.uuid);
1406 class_export_put(exp);
1408 EXPORT_SYMBOL(class_fail_export);
1410 char *obd_export_nid2str(struct obd_export *exp)
1412 if (exp->exp_connection != NULL)
1413 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1417 EXPORT_SYMBOL(obd_export_nid2str);
1419 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1421 cfs_hash_t *nid_hash;
1422 struct obd_export *doomed_exp = NULL;
1423 int exports_evicted = 0;
1425 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1427 spin_lock(&obd->obd_dev_lock);
1428 /* umount has run already, so evict thread should leave
1429 * its task to umount thread now */
1430 if (obd->obd_stopping) {
1431 spin_unlock(&obd->obd_dev_lock);
1432 return exports_evicted;
1434 nid_hash = obd->obd_nid_hash;
1435 cfs_hash_getref(nid_hash);
1436 spin_unlock(&obd->obd_dev_lock);
1439 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1440 if (doomed_exp == NULL)
1443 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1444 "nid %s found, wanted nid %s, requested nid %s\n",
1445 obd_export_nid2str(doomed_exp),
1446 libcfs_nid2str(nid_key), nid);
1447 LASSERTF(doomed_exp != obd->obd_self_export,
1448 "self-export is hashed by NID?\n");
1450 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1451 "request\n", obd->obd_name,
1452 obd_uuid2str(&doomed_exp->exp_client_uuid),
1453 obd_export_nid2str(doomed_exp));
1454 class_fail_export(doomed_exp);
1455 class_export_put(doomed_exp);
1458 cfs_hash_putref(nid_hash);
1460 if (!exports_evicted)
1461 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1462 obd->obd_name, nid);
1463 return exports_evicted;
1465 EXPORT_SYMBOL(obd_export_evict_by_nid);
1467 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1469 cfs_hash_t *uuid_hash;
1470 struct obd_export *doomed_exp = NULL;
1471 struct obd_uuid doomed_uuid;
1472 int exports_evicted = 0;
1474 spin_lock(&obd->obd_dev_lock);
1475 if (obd->obd_stopping) {
1476 spin_unlock(&obd->obd_dev_lock);
1477 return exports_evicted;
1479 uuid_hash = obd->obd_uuid_hash;
1480 cfs_hash_getref(uuid_hash);
1481 spin_unlock(&obd->obd_dev_lock);
1483 obd_str2uuid(&doomed_uuid, uuid);
1484 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1485 CERROR("%s: can't evict myself\n", obd->obd_name);
1486 cfs_hash_putref(uuid_hash);
1487 return exports_evicted;
1490 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1492 if (doomed_exp == NULL) {
1493 CERROR("%s: can't disconnect %s: no exports found\n",
1494 obd->obd_name, uuid);
1496 CWARN("%s: evicting %s at adminstrative request\n",
1497 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1498 class_fail_export(doomed_exp);
1499 class_export_put(doomed_exp);
1502 cfs_hash_putref(uuid_hash);
1504 return exports_evicted;
1506 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1508 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1509 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1510 EXPORT_SYMBOL(class_export_dump_hook);
1513 static void print_export_data(struct obd_export *exp, const char *status,
1516 struct ptlrpc_reply_state *rs;
1517 struct ptlrpc_reply_state *first_reply = NULL;
1520 spin_lock(&exp->exp_lock);
1521 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1527 spin_unlock(&exp->exp_lock);
1529 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1530 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1531 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1532 cfs_atomic_read(&exp->exp_rpc_count),
1533 cfs_atomic_read(&exp->exp_cb_count),
1534 cfs_atomic_read(&exp->exp_locks_count),
1535 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1536 nreplies, first_reply, nreplies > 3 ? "..." : "",
1537 exp->exp_last_committed);
1538 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1539 if (locks && class_export_dump_hook != NULL)
1540 class_export_dump_hook(exp);
1544 void dump_exports(struct obd_device *obd, int locks)
1546 struct obd_export *exp;
1548 spin_lock(&obd->obd_dev_lock);
1549 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1550 print_export_data(exp, "ACTIVE", locks);
1551 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1552 print_export_data(exp, "UNLINKED", locks);
1553 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1554 print_export_data(exp, "DELAYED", locks);
1555 spin_unlock(&obd->obd_dev_lock);
1556 spin_lock(&obd_zombie_impexp_lock);
1557 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1558 print_export_data(exp, "ZOMBIE", locks);
1559 spin_unlock(&obd_zombie_impexp_lock);
1561 EXPORT_SYMBOL(dump_exports);
1563 void obd_exports_barrier(struct obd_device *obd)
1566 LASSERT(cfs_list_empty(&obd->obd_exports));
1567 spin_lock(&obd->obd_dev_lock);
1568 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1569 spin_unlock(&obd->obd_dev_lock);
1570 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1571 cfs_time_seconds(waited));
1572 if (waited > 5 && IS_PO2(waited)) {
1573 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1574 "more than %d seconds. "
1575 "The obd refcount = %d. Is it stuck?\n",
1576 obd->obd_name, waited,
1577 cfs_atomic_read(&obd->obd_refcount));
1578 dump_exports(obd, 1);
1581 spin_lock(&obd->obd_dev_lock);
1583 spin_unlock(&obd->obd_dev_lock);
1585 EXPORT_SYMBOL(obd_exports_barrier);
1587 /* Total amount of zombies to be destroyed */
1588 static int zombies_count = 0;
1591 * kill zombie imports and exports
1593 void obd_zombie_impexp_cull(void)
1595 struct obd_import *import;
1596 struct obd_export *export;
1600 spin_lock(&obd_zombie_impexp_lock);
1603 if (!cfs_list_empty(&obd_zombie_imports)) {
1604 import = cfs_list_entry(obd_zombie_imports.next,
1607 cfs_list_del_init(&import->imp_zombie_chain);
1611 if (!cfs_list_empty(&obd_zombie_exports)) {
1612 export = cfs_list_entry(obd_zombie_exports.next,
1615 cfs_list_del_init(&export->exp_obd_chain);
1618 spin_unlock(&obd_zombie_impexp_lock);
1620 if (import != NULL) {
1621 class_import_destroy(import);
1622 spin_lock(&obd_zombie_impexp_lock);
1624 spin_unlock(&obd_zombie_impexp_lock);
1627 if (export != NULL) {
1628 class_export_destroy(export);
1629 spin_lock(&obd_zombie_impexp_lock);
1631 spin_unlock(&obd_zombie_impexp_lock);
1635 } while (import != NULL || export != NULL);
1639 static struct completion obd_zombie_start;
1640 static struct completion obd_zombie_stop;
1641 static unsigned long obd_zombie_flags;
1642 static wait_queue_head_t obd_zombie_waitq;
1643 static pid_t obd_zombie_pid;
1646 OBD_ZOMBIE_STOP = 0x0001,
1650 * check for work for kill zombie import/export thread.
1652 static int obd_zombie_impexp_check(void *arg)
1656 spin_lock(&obd_zombie_impexp_lock);
1657 rc = (zombies_count == 0) &&
1658 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1659 spin_unlock(&obd_zombie_impexp_lock);
1665 * Add export to the obd_zombe thread and notify it.
1667 static void obd_zombie_export_add(struct obd_export *exp) {
1668 spin_lock(&exp->exp_obd->obd_dev_lock);
1669 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1670 cfs_list_del_init(&exp->exp_obd_chain);
1671 spin_unlock(&exp->exp_obd->obd_dev_lock);
1672 spin_lock(&obd_zombie_impexp_lock);
1674 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1675 spin_unlock(&obd_zombie_impexp_lock);
1677 obd_zombie_impexp_notify();
1681 * Add import to the obd_zombe thread and notify it.
1683 static void obd_zombie_import_add(struct obd_import *imp) {
1684 LASSERT(imp->imp_sec == NULL);
1685 LASSERT(imp->imp_rq_pool == NULL);
1686 spin_lock(&obd_zombie_impexp_lock);
1687 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1689 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1690 spin_unlock(&obd_zombie_impexp_lock);
1692 obd_zombie_impexp_notify();
1696 * notify import/export destroy thread about new zombie.
1698 static void obd_zombie_impexp_notify(void)
1701 * Make sure obd_zomebie_impexp_thread get this notification.
1702 * It is possible this signal only get by obd_zombie_barrier, and
1703 * barrier gulps this notification and sleeps away and hangs ensues
1705 wake_up_all(&obd_zombie_waitq);
1709 * check whether obd_zombie is idle
1711 static int obd_zombie_is_idle(void)
1715 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1716 spin_lock(&obd_zombie_impexp_lock);
1717 rc = (zombies_count == 0);
1718 spin_unlock(&obd_zombie_impexp_lock);
1723 * wait when obd_zombie import/export queues become empty
1725 void obd_zombie_barrier(void)
1727 struct l_wait_info lwi = { 0 };
1729 if (obd_zombie_pid == current_pid())
1730 /* don't wait for myself */
1732 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1734 EXPORT_SYMBOL(obd_zombie_barrier);
1739 * destroy zombie export/import thread.
1741 static int obd_zombie_impexp_thread(void *unused)
1743 unshare_fs_struct();
1744 complete(&obd_zombie_start);
1746 obd_zombie_pid = current_pid();
1748 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1749 struct l_wait_info lwi = { 0 };
1751 l_wait_event(obd_zombie_waitq,
1752 !obd_zombie_impexp_check(NULL), &lwi);
1753 obd_zombie_impexp_cull();
1756 * Notify obd_zombie_barrier callers that queues
1759 wake_up(&obd_zombie_waitq);
1762 complete(&obd_zombie_stop);
1767 #else /* ! KERNEL */
1769 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1770 static void *obd_zombie_impexp_work_cb;
1771 static void *obd_zombie_impexp_idle_cb;
1773 int obd_zombie_impexp_kill(void *arg)
1777 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1778 obd_zombie_impexp_cull();
1781 cfs_atomic_dec(&zombie_recur);
1788 * start destroy zombie import/export thread
1790 int obd_zombie_impexp_init(void)
1793 struct task_struct *task;
1796 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1797 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1798 spin_lock_init(&obd_zombie_impexp_lock);
1799 init_completion(&obd_zombie_start);
1800 init_completion(&obd_zombie_stop);
1801 init_waitqueue_head(&obd_zombie_waitq);
1805 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1807 RETURN(PTR_ERR(task));
1809 wait_for_completion(&obd_zombie_start);
1812 obd_zombie_impexp_work_cb =
1813 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1814 &obd_zombie_impexp_kill, NULL);
1816 obd_zombie_impexp_idle_cb =
1817 liblustre_register_idle_callback("obd_zombi_impexp_check",
1818 &obd_zombie_impexp_check, NULL);
1823 * stop destroy zombie import/export thread
1825 void obd_zombie_impexp_stop(void)
1827 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1828 obd_zombie_impexp_notify();
1830 wait_for_completion(&obd_zombie_stop);
1832 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1833 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1837 /***** Kernel-userspace comm helpers *******/
1839 /* Get length of entire message, including header */
1840 int kuc_len(int payload_len)
1842 return sizeof(struct kuc_hdr) + payload_len;
1844 EXPORT_SYMBOL(kuc_len);
1846 /* Get a pointer to kuc header, given a ptr to the payload
1847 * @param p Pointer to payload area
1848 * @returns Pointer to kuc header
1850 struct kuc_hdr * kuc_ptr(void *p)
1852 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1853 LASSERT(lh->kuc_magic == KUC_MAGIC);
1856 EXPORT_SYMBOL(kuc_ptr);
1858 /* Test if payload is part of kuc message
1859 * @param p Pointer to payload area
1862 int kuc_ispayload(void *p)
1864 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1866 if (kh->kuc_magic == KUC_MAGIC)
1871 EXPORT_SYMBOL(kuc_ispayload);
1873 /* Alloc space for a message, and fill in header
1874 * @return Pointer to payload area
1876 void *kuc_alloc(int payload_len, int transport, int type)
1879 int len = kuc_len(payload_len);
1883 return ERR_PTR(-ENOMEM);
1885 lh->kuc_magic = KUC_MAGIC;
1886 lh->kuc_transport = transport;
1887 lh->kuc_msgtype = type;
1888 lh->kuc_msglen = len;
1890 return (void *)(lh + 1);
1892 EXPORT_SYMBOL(kuc_alloc);
1894 /* Takes pointer to payload area */
1895 inline void kuc_free(void *p, int payload_len)
1897 struct kuc_hdr *lh = kuc_ptr(p);
1898 OBD_FREE(lh, kuc_len(payload_len));
1900 EXPORT_SYMBOL(kuc_free);