4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
53 struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 struct kmem_cache *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, __GFP_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!cfs_request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 cfs_try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
151 EXPORT_SYMBOL(class_get_type);
153 void class_put_type(struct obd_type *type)
156 spin_lock(&type->obd_type_lock);
158 cfs_module_put(type->typ_dt_ops->o_owner);
159 spin_unlock(&type->obd_type_lock);
161 EXPORT_SYMBOL(class_put_type);
163 #define CLASS_MAX_NAME 1024
165 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
166 struct lprocfs_vars *vars, const char *name,
167 struct lu_device_type *ldt)
169 struct obd_type *type;
174 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
176 if (class_search_type(name)) {
177 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
182 OBD_ALLOC(type, sizeof(*type));
186 OBD_ALLOC_PTR(type->typ_dt_ops);
187 OBD_ALLOC_PTR(type->typ_md_ops);
188 OBD_ALLOC(type->typ_name, strlen(name) + 1);
190 if (type->typ_dt_ops == NULL ||
191 type->typ_md_ops == NULL ||
192 type->typ_name == NULL)
195 *(type->typ_dt_ops) = *dt_ops;
196 /* md_ops is optional */
198 *(type->typ_md_ops) = *md_ops;
199 strcpy(type->typ_name, name);
200 spin_lock_init(&type->obd_type_lock);
203 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
205 if (IS_ERR(type->typ_procroot)) {
206 rc = PTR_ERR(type->typ_procroot);
207 type->typ_procroot = NULL;
213 rc = lu_device_type_init(ldt);
218 spin_lock(&obd_types_lock);
219 cfs_list_add(&type->typ_chain, &obd_types);
220 spin_unlock(&obd_types_lock);
225 if (type->typ_name != NULL)
226 OBD_FREE(type->typ_name, strlen(name) + 1);
227 if (type->typ_md_ops != NULL)
228 OBD_FREE_PTR(type->typ_md_ops);
229 if (type->typ_dt_ops != NULL)
230 OBD_FREE_PTR(type->typ_dt_ops);
231 OBD_FREE(type, sizeof(*type));
234 EXPORT_SYMBOL(class_register_type);
236 int class_unregister_type(const char *name)
238 struct obd_type *type = class_search_type(name);
242 CERROR("unknown obd type\n");
246 if (type->typ_refcnt) {
247 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
248 /* This is a bad situation, let's make the best of it */
249 /* Remove ops, but leave the name for debugging */
250 OBD_FREE_PTR(type->typ_dt_ops);
251 OBD_FREE_PTR(type->typ_md_ops);
255 /* we do not use type->typ_procroot as for compatibility purposes
256 * other modules can share names (i.e. lod can use lov entry). so
257 * we can't reference pointer as it can get invalided when another
258 * module removes the entry */
259 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
262 lu_device_type_fini(type->typ_lu);
264 spin_lock(&obd_types_lock);
265 cfs_list_del(&type->typ_chain);
266 spin_unlock(&obd_types_lock);
267 OBD_FREE(type->typ_name, strlen(name) + 1);
268 if (type->typ_dt_ops != NULL)
269 OBD_FREE_PTR(type->typ_dt_ops);
270 if (type->typ_md_ops != NULL)
271 OBD_FREE_PTR(type->typ_md_ops);
272 OBD_FREE(type, sizeof(*type));
274 } /* class_unregister_type */
275 EXPORT_SYMBOL(class_unregister_type);
278 * Create a new obd device.
280 * Find an empty slot in ::obd_devs[], create a new obd device in it.
282 * \param[in] type_name obd device type string.
283 * \param[in] name obd device name.
285 * \retval NULL if create fails, otherwise return the obd device
288 struct obd_device *class_newdev(const char *type_name, const char *name)
290 struct obd_device *result = NULL;
291 struct obd_device *newdev;
292 struct obd_type *type = NULL;
294 int new_obd_minor = 0;
297 if (strlen(name) >= MAX_OBD_NAME) {
298 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
299 RETURN(ERR_PTR(-EINVAL));
302 type = class_get_type(type_name);
304 CERROR("OBD: unknown type: %s\n", type_name);
305 RETURN(ERR_PTR(-ENODEV));
308 newdev = obd_device_alloc();
310 GOTO(out_type, result = ERR_PTR(-ENOMEM));
312 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
314 write_lock(&obd_dev_lock);
315 for (i = 0; i < class_devno_max(); i++) {
316 struct obd_device *obd = class_num2obd(i);
318 if (obd && (strcmp(name, obd->obd_name) == 0)) {
319 CERROR("Device %s already exists at %d, won't add\n",
322 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
323 "%p obd_magic %08x != %08x\n", result,
324 result->obd_magic, OBD_DEVICE_MAGIC);
325 LASSERTF(result->obd_minor == new_obd_minor,
326 "%p obd_minor %d != %d\n", result,
327 result->obd_minor, new_obd_minor);
329 obd_devs[result->obd_minor] = NULL;
330 result->obd_name[0]='\0';
332 result = ERR_PTR(-EEXIST);
335 if (!result && !obd) {
337 result->obd_minor = i;
339 result->obd_type = type;
340 strncpy(result->obd_name, name,
341 sizeof(result->obd_name) - 1);
342 obd_devs[i] = result;
345 write_unlock(&obd_dev_lock);
347 if (result == NULL && i >= class_devno_max()) {
348 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
350 GOTO(out, result = ERR_PTR(-EOVERFLOW));
356 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
357 result->obd_name, result);
361 obd_device_free(newdev);
363 class_put_type(type);
367 void class_release_dev(struct obd_device *obd)
369 struct obd_type *obd_type = obd->obd_type;
371 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
372 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
373 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
374 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
375 LASSERT(obd_type != NULL);
377 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
378 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
380 write_lock(&obd_dev_lock);
381 obd_devs[obd->obd_minor] = NULL;
382 write_unlock(&obd_dev_lock);
383 obd_device_free(obd);
385 class_put_type(obd_type);
388 int class_name2dev(const char *name)
395 read_lock(&obd_dev_lock);
396 for (i = 0; i < class_devno_max(); i++) {
397 struct obd_device *obd = class_num2obd(i);
399 if (obd && strcmp(name, obd->obd_name) == 0) {
400 /* Make sure we finished attaching before we give
401 out any references */
402 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
403 if (obd->obd_attached) {
404 read_unlock(&obd_dev_lock);
410 read_unlock(&obd_dev_lock);
414 EXPORT_SYMBOL(class_name2dev);
416 struct obd_device *class_name2obd(const char *name)
418 int dev = class_name2dev(name);
420 if (dev < 0 || dev > class_devno_max())
422 return class_num2obd(dev);
424 EXPORT_SYMBOL(class_name2obd);
426 int class_uuid2dev(struct obd_uuid *uuid)
430 read_lock(&obd_dev_lock);
431 for (i = 0; i < class_devno_max(); i++) {
432 struct obd_device *obd = class_num2obd(i);
434 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
435 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
436 read_unlock(&obd_dev_lock);
440 read_unlock(&obd_dev_lock);
444 EXPORT_SYMBOL(class_uuid2dev);
446 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
448 int dev = class_uuid2dev(uuid);
451 return class_num2obd(dev);
453 EXPORT_SYMBOL(class_uuid2obd);
456 * Get obd device from ::obd_devs[]
458 * \param num [in] array index
460 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
461 * otherwise return the obd device there.
463 struct obd_device *class_num2obd(int num)
465 struct obd_device *obd = NULL;
467 if (num < class_devno_max()) {
472 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
473 "%p obd_magic %08x != %08x\n",
474 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
475 LASSERTF(obd->obd_minor == num,
476 "%p obd_minor %0d != %0d\n",
477 obd, obd->obd_minor, num);
482 EXPORT_SYMBOL(class_num2obd);
485 * Get obd devices count. Device in any
487 * \retval obd device count
489 int get_devices_count(void)
491 int index, max_index = class_devno_max(), dev_count = 0;
493 read_lock(&obd_dev_lock);
494 for (index = 0; index <= max_index; index++) {
495 struct obd_device *obd = class_num2obd(index);
499 read_unlock(&obd_dev_lock);
503 EXPORT_SYMBOL(get_devices_count);
505 void class_obd_list(void)
510 read_lock(&obd_dev_lock);
511 for (i = 0; i < class_devno_max(); i++) {
512 struct obd_device *obd = class_num2obd(i);
516 if (obd->obd_stopping)
518 else if (obd->obd_set_up)
520 else if (obd->obd_attached)
524 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
525 i, status, obd->obd_type->typ_name,
526 obd->obd_name, obd->obd_uuid.uuid,
527 cfs_atomic_read(&obd->obd_refcount));
529 read_unlock(&obd_dev_lock);
533 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
534 specified, then only the client with that uuid is returned,
535 otherwise any client connected to the tgt is returned. */
536 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
537 const char * typ_name,
538 struct obd_uuid *grp_uuid)
542 read_lock(&obd_dev_lock);
543 for (i = 0; i < class_devno_max(); i++) {
544 struct obd_device *obd = class_num2obd(i);
548 if ((strncmp(obd->obd_type->typ_name, typ_name,
549 strlen(typ_name)) == 0)) {
550 if (obd_uuid_equals(tgt_uuid,
551 &obd->u.cli.cl_target_uuid) &&
552 ((grp_uuid)? obd_uuid_equals(grp_uuid,
553 &obd->obd_uuid) : 1)) {
554 read_unlock(&obd_dev_lock);
559 read_unlock(&obd_dev_lock);
563 EXPORT_SYMBOL(class_find_client_obd);
565 /* Iterate the obd_device list looking devices have grp_uuid. Start
566 searching at *next, and if a device is found, the next index to look
567 at is saved in *next. If next is NULL, then the first matching device
568 will always be returned. */
569 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
575 else if (*next >= 0 && *next < class_devno_max())
580 read_lock(&obd_dev_lock);
581 for (; i < class_devno_max(); i++) {
582 struct obd_device *obd = class_num2obd(i);
586 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
589 read_unlock(&obd_dev_lock);
593 read_unlock(&obd_dev_lock);
597 EXPORT_SYMBOL(class_devices_in_group);
600 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
601 * adjust sptlrpc settings accordingly.
603 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
605 struct obd_device *obd;
609 LASSERT(namelen > 0);
611 read_lock(&obd_dev_lock);
612 for (i = 0; i < class_devno_max(); i++) {
613 obd = class_num2obd(i);
615 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
618 /* only notify mdc, osc, mdt, ost */
619 type = obd->obd_type->typ_name;
620 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
621 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
622 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
623 strcmp(type, LUSTRE_OST_NAME) != 0)
626 if (strncmp(obd->obd_name, fsname, namelen))
629 class_incref(obd, __FUNCTION__, obd);
630 read_unlock(&obd_dev_lock);
631 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
632 sizeof(KEY_SPTLRPC_CONF),
633 KEY_SPTLRPC_CONF, 0, NULL, NULL);
635 class_decref(obd, __FUNCTION__, obd);
636 read_lock(&obd_dev_lock);
638 read_unlock(&obd_dev_lock);
641 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
643 void obd_cleanup_caches(void)
646 if (obd_device_cachep) {
647 kmem_cache_destroy(obd_device_cachep);
648 obd_device_cachep = NULL;
651 kmem_cache_destroy(obdo_cachep);
655 kmem_cache_destroy(import_cachep);
656 import_cachep = NULL;
659 kmem_cache_destroy(capa_cachep);
665 int obd_init_caches(void)
669 LASSERT(obd_device_cachep == NULL);
670 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
671 sizeof(struct obd_device),
673 if (!obd_device_cachep)
676 LASSERT(obdo_cachep == NULL);
677 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
682 LASSERT(import_cachep == NULL);
683 import_cachep = kmem_cache_create("ll_import_cache",
684 sizeof(struct obd_import),
689 LASSERT(capa_cachep == NULL);
690 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
697 obd_cleanup_caches();
702 /* map connection to client */
703 struct obd_export *class_conn2export(struct lustre_handle *conn)
705 struct obd_export *export;
709 CDEBUG(D_CACHE, "looking for null handle\n");
713 if (conn->cookie == -1) { /* this means assign a new connection */
714 CDEBUG(D_CACHE, "want a new connection\n");
718 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
719 export = class_handle2object(conn->cookie);
722 EXPORT_SYMBOL(class_conn2export);
724 struct obd_device *class_exp2obd(struct obd_export *exp)
730 EXPORT_SYMBOL(class_exp2obd);
732 struct obd_device *class_conn2obd(struct lustre_handle *conn)
734 struct obd_export *export;
735 export = class_conn2export(conn);
737 struct obd_device *obd = export->exp_obd;
738 class_export_put(export);
743 EXPORT_SYMBOL(class_conn2obd);
745 struct obd_import *class_exp2cliimp(struct obd_export *exp)
747 struct obd_device *obd = exp->exp_obd;
750 return obd->u.cli.cl_import;
752 EXPORT_SYMBOL(class_exp2cliimp);
754 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
756 struct obd_device *obd = class_conn2obd(conn);
759 return obd->u.cli.cl_import;
761 EXPORT_SYMBOL(class_conn2cliimp);
763 /* Export management functions */
764 static void class_export_destroy(struct obd_export *exp)
766 struct obd_device *obd = exp->exp_obd;
769 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
770 LASSERT(obd != NULL);
772 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
773 exp->exp_client_uuid.uuid, obd->obd_name);
775 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
776 if (exp->exp_connection)
777 ptlrpc_put_connection_superhack(exp->exp_connection);
779 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
780 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
781 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
782 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
783 obd_destroy_export(exp);
784 class_decref(obd, "export", exp);
786 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
790 static void export_handle_addref(void *export)
792 class_export_get(export);
795 static struct portals_handle_ops export_handle_ops = {
796 .hop_addref = export_handle_addref,
800 struct obd_export *class_export_get(struct obd_export *exp)
802 cfs_atomic_inc(&exp->exp_refcount);
803 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
804 cfs_atomic_read(&exp->exp_refcount));
807 EXPORT_SYMBOL(class_export_get);
809 void class_export_put(struct obd_export *exp)
811 LASSERT(exp != NULL);
812 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
813 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
814 cfs_atomic_read(&exp->exp_refcount) - 1);
816 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
817 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
818 CDEBUG(D_IOCTL, "final put %p/%s\n",
819 exp, exp->exp_client_uuid.uuid);
821 /* release nid stat refererence */
822 lprocfs_exp_cleanup(exp);
824 obd_zombie_export_add(exp);
827 EXPORT_SYMBOL(class_export_put);
829 /* Creates a new export, adds it to the hash table, and returns a
830 * pointer to it. The refcount is 2: one for the hash reference, and
831 * one for the pointer returned by this function. */
832 struct obd_export *class_new_export(struct obd_device *obd,
833 struct obd_uuid *cluuid)
835 struct obd_export *export;
836 cfs_hash_t *hash = NULL;
840 OBD_ALLOC_PTR(export);
842 return ERR_PTR(-ENOMEM);
844 export->exp_conn_cnt = 0;
845 export->exp_lock_hash = NULL;
846 export->exp_flock_hash = NULL;
847 cfs_atomic_set(&export->exp_refcount, 2);
848 cfs_atomic_set(&export->exp_rpc_count, 0);
849 cfs_atomic_set(&export->exp_cb_count, 0);
850 cfs_atomic_set(&export->exp_locks_count, 0);
851 #if LUSTRE_TRACKS_LOCK_EXP_REFS
852 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
853 spin_lock_init(&export->exp_locks_list_guard);
855 cfs_atomic_set(&export->exp_replay_count, 0);
856 export->exp_obd = obd;
857 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
858 spin_lock_init(&export->exp_uncommitted_replies_lock);
859 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
860 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
861 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
862 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
863 class_handle_hash(&export->exp_handle, &export_handle_ops);
864 export->exp_last_request_time = cfs_time_current_sec();
865 spin_lock_init(&export->exp_lock);
866 spin_lock_init(&export->exp_rpc_lock);
867 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
868 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
869 spin_lock_init(&export->exp_bl_list_lock);
870 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
872 export->exp_sp_peer = LUSTRE_SP_ANY;
873 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
874 export->exp_client_uuid = *cluuid;
875 obd_init_export(export);
877 spin_lock(&obd->obd_dev_lock);
878 /* shouldn't happen, but might race */
879 if (obd->obd_stopping)
880 GOTO(exit_unlock, rc = -ENODEV);
882 hash = cfs_hash_getref(obd->obd_uuid_hash);
884 GOTO(exit_unlock, rc = -ENODEV);
885 spin_unlock(&obd->obd_dev_lock);
887 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
888 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
890 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
891 obd->obd_name, cluuid->uuid, rc);
892 GOTO(exit_err, rc = -EALREADY);
896 spin_lock(&obd->obd_dev_lock);
897 if (obd->obd_stopping) {
898 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
899 GOTO(exit_unlock, rc = -ENODEV);
902 class_incref(obd, "export", export);
903 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
904 cfs_list_add_tail(&export->exp_obd_chain_timed,
905 &export->exp_obd->obd_exports_timed);
906 export->exp_obd->obd_num_exports++;
907 spin_unlock(&obd->obd_dev_lock);
908 cfs_hash_putref(hash);
912 spin_unlock(&obd->obd_dev_lock);
915 cfs_hash_putref(hash);
916 class_handle_unhash(&export->exp_handle);
917 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
918 obd_destroy_export(export);
919 OBD_FREE_PTR(export);
922 EXPORT_SYMBOL(class_new_export);
924 void class_unlink_export(struct obd_export *exp)
926 class_handle_unhash(&exp->exp_handle);
928 spin_lock(&exp->exp_obd->obd_dev_lock);
929 /* delete an uuid-export hashitem from hashtables */
930 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
931 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
932 &exp->exp_client_uuid,
933 &exp->exp_uuid_hash);
935 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
936 cfs_list_del_init(&exp->exp_obd_chain_timed);
937 exp->exp_obd->obd_num_exports--;
938 spin_unlock(&exp->exp_obd->obd_dev_lock);
939 class_export_put(exp);
941 EXPORT_SYMBOL(class_unlink_export);
943 /* Import management functions */
944 void class_import_destroy(struct obd_import *imp)
948 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
949 imp->imp_obd->obd_name);
951 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
953 ptlrpc_put_connection_superhack(imp->imp_connection);
955 while (!cfs_list_empty(&imp->imp_conn_list)) {
956 struct obd_import_conn *imp_conn;
958 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
959 struct obd_import_conn, oic_item);
960 cfs_list_del_init(&imp_conn->oic_item);
961 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
962 OBD_FREE(imp_conn, sizeof(*imp_conn));
965 LASSERT(imp->imp_sec == NULL);
966 class_decref(imp->imp_obd, "import", imp);
967 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
971 static void import_handle_addref(void *import)
973 class_import_get(import);
976 static struct portals_handle_ops import_handle_ops = {
977 .hop_addref = import_handle_addref,
981 struct obd_import *class_import_get(struct obd_import *import)
983 cfs_atomic_inc(&import->imp_refcount);
984 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
985 cfs_atomic_read(&import->imp_refcount),
986 import->imp_obd->obd_name);
989 EXPORT_SYMBOL(class_import_get);
991 void class_import_put(struct obd_import *imp)
995 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
996 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
998 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
999 cfs_atomic_read(&imp->imp_refcount) - 1,
1000 imp->imp_obd->obd_name);
1002 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1003 CDEBUG(D_INFO, "final put import %p\n", imp);
1004 obd_zombie_import_add(imp);
1007 /* catch possible import put race */
1008 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1011 EXPORT_SYMBOL(class_import_put);
1013 static void init_imp_at(struct imp_at *at) {
1015 at_init(&at->iat_net_latency, 0, 0);
1016 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1017 /* max service estimates are tracked on the server side, so
1018 don't use the AT history here, just use the last reported
1019 val. (But keep hist for proc histogram, worst_ever) */
1020 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1025 struct obd_import *class_new_import(struct obd_device *obd)
1027 struct obd_import *imp;
1029 OBD_ALLOC(imp, sizeof(*imp));
1033 CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1034 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1035 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1036 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1037 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1038 spin_lock_init(&imp->imp_lock);
1039 imp->imp_last_success_conn = 0;
1040 imp->imp_state = LUSTRE_IMP_NEW;
1041 imp->imp_obd = class_incref(obd, "import", imp);
1042 mutex_init(&imp->imp_sec_mutex);
1043 cfs_waitq_init(&imp->imp_recovery_waitq);
1045 cfs_atomic_set(&imp->imp_refcount, 2);
1046 cfs_atomic_set(&imp->imp_unregistering, 0);
1047 cfs_atomic_set(&imp->imp_inflight, 0);
1048 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1049 cfs_atomic_set(&imp->imp_inval_count, 0);
1050 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1051 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1052 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1053 init_imp_at(&imp->imp_at);
1055 /* the default magic is V2, will be used in connect RPC, and
1056 * then adjusted according to the flags in request/reply. */
1057 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1061 EXPORT_SYMBOL(class_new_import);
1063 void class_destroy_import(struct obd_import *import)
1065 LASSERT(import != NULL);
1066 LASSERT(import != LP_POISON);
1068 class_handle_unhash(&import->imp_handle);
1070 spin_lock(&import->imp_lock);
1071 import->imp_generation++;
1072 spin_unlock(&import->imp_lock);
1073 class_import_put(import);
1075 EXPORT_SYMBOL(class_destroy_import);
1077 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1079 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1081 spin_lock(&exp->exp_locks_list_guard);
1083 LASSERT(lock->l_exp_refs_nr >= 0);
1085 if (lock->l_exp_refs_target != NULL &&
1086 lock->l_exp_refs_target != exp) {
1087 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1088 exp, lock, lock->l_exp_refs_target);
1090 if ((lock->l_exp_refs_nr ++) == 0) {
1091 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1092 lock->l_exp_refs_target = exp;
1094 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1095 lock, exp, lock->l_exp_refs_nr);
1096 spin_unlock(&exp->exp_locks_list_guard);
1098 EXPORT_SYMBOL(__class_export_add_lock_ref);
1100 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1102 spin_lock(&exp->exp_locks_list_guard);
1103 LASSERT(lock->l_exp_refs_nr > 0);
1104 if (lock->l_exp_refs_target != exp) {
1105 LCONSOLE_WARN("lock %p, "
1106 "mismatching export pointers: %p, %p\n",
1107 lock, lock->l_exp_refs_target, exp);
1109 if (-- lock->l_exp_refs_nr == 0) {
1110 cfs_list_del_init(&lock->l_exp_refs_link);
1111 lock->l_exp_refs_target = NULL;
1113 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1114 lock, exp, lock->l_exp_refs_nr);
1115 spin_unlock(&exp->exp_locks_list_guard);
1117 EXPORT_SYMBOL(__class_export_del_lock_ref);
1120 /* A connection defines an export context in which preallocation can
1121 be managed. This releases the export pointer reference, and returns
1122 the export handle, so the export refcount is 1 when this function
1124 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1125 struct obd_uuid *cluuid)
1127 struct obd_export *export;
1128 LASSERT(conn != NULL);
1129 LASSERT(obd != NULL);
1130 LASSERT(cluuid != NULL);
1133 export = class_new_export(obd, cluuid);
1135 RETURN(PTR_ERR(export));
1137 conn->cookie = export->exp_handle.h_cookie;
1138 class_export_put(export);
1140 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1141 cluuid->uuid, conn->cookie);
1144 EXPORT_SYMBOL(class_connect);
1146 /* if export is involved in recovery then clean up related things */
1147 void class_export_recovery_cleanup(struct obd_export *exp)
1149 struct obd_device *obd = exp->exp_obd;
1151 spin_lock(&obd->obd_recovery_task_lock);
1152 if (exp->exp_delayed)
1153 obd->obd_delayed_clients--;
1154 if (obd->obd_recovering) {
1155 if (exp->exp_in_recovery) {
1156 spin_lock(&exp->exp_lock);
1157 exp->exp_in_recovery = 0;
1158 spin_unlock(&exp->exp_lock);
1159 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1160 cfs_atomic_dec(&obd->obd_connected_clients);
1163 /* if called during recovery then should update
1164 * obd_stale_clients counter,
1165 * lightweight exports are not counted */
1166 if (exp->exp_failed &&
1167 (exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1168 exp->exp_obd->obd_stale_clients++;
1170 spin_unlock(&obd->obd_recovery_task_lock);
1171 /** Cleanup req replay fields */
1172 if (exp->exp_req_replay_needed) {
1173 spin_lock(&exp->exp_lock);
1174 exp->exp_req_replay_needed = 0;
1175 spin_unlock(&exp->exp_lock);
1176 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1177 cfs_atomic_dec(&obd->obd_req_replay_clients);
1179 /** Cleanup lock replay data */
1180 if (exp->exp_lock_replay_needed) {
1181 spin_lock(&exp->exp_lock);
1182 exp->exp_lock_replay_needed = 0;
1183 spin_unlock(&exp->exp_lock);
1184 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1185 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1189 /* This function removes 1-3 references from the export:
1190 * 1 - for export pointer passed
1191 * and if disconnect really need
1192 * 2 - removing from hash
1193 * 3 - in client_unlink_export
1194 * The export pointer passed to this function can destroyed */
1195 int class_disconnect(struct obd_export *export)
1197 int already_disconnected;
1200 if (export == NULL) {
1201 CWARN("attempting to free NULL export %p\n", export);
1205 spin_lock(&export->exp_lock);
1206 already_disconnected = export->exp_disconnected;
1207 export->exp_disconnected = 1;
1208 spin_unlock(&export->exp_lock);
1210 /* class_cleanup(), abort_recovery(), and class_fail_export()
1211 * all end up in here, and if any of them race we shouldn't
1212 * call extra class_export_puts(). */
1213 if (already_disconnected) {
1214 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1215 GOTO(no_disconn, already_disconnected);
1218 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1219 export->exp_handle.h_cookie);
1221 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1222 cfs_hash_del(export->exp_obd->obd_nid_hash,
1223 &export->exp_connection->c_peer.nid,
1224 &export->exp_nid_hash);
1226 class_export_recovery_cleanup(export);
1227 class_unlink_export(export);
1229 class_export_put(export);
1232 EXPORT_SYMBOL(class_disconnect);
1234 /* Return non-zero for a fully connected export */
1235 int class_connected_export(struct obd_export *exp)
1239 spin_lock(&exp->exp_lock);
1240 connected = (exp->exp_conn_cnt > 0);
1241 spin_unlock(&exp->exp_lock);
1246 EXPORT_SYMBOL(class_connected_export);
1248 static void class_disconnect_export_list(cfs_list_t *list,
1249 enum obd_option flags)
1252 struct obd_export *exp;
1255 /* It's possible that an export may disconnect itself, but
1256 * nothing else will be added to this list. */
1257 while (!cfs_list_empty(list)) {
1258 exp = cfs_list_entry(list->next, struct obd_export,
1260 /* need for safe call CDEBUG after obd_disconnect */
1261 class_export_get(exp);
1263 spin_lock(&exp->exp_lock);
1264 exp->exp_flags = flags;
1265 spin_unlock(&exp->exp_lock);
1267 if (obd_uuid_equals(&exp->exp_client_uuid,
1268 &exp->exp_obd->obd_uuid)) {
1270 "exp %p export uuid == obd uuid, don't discon\n",
1272 /* Need to delete this now so we don't end up pointing
1273 * to work_list later when this export is cleaned up. */
1274 cfs_list_del_init(&exp->exp_obd_chain);
1275 class_export_put(exp);
1279 class_export_get(exp);
1280 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1281 "last request at "CFS_TIME_T"\n",
1282 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1283 exp, exp->exp_last_request_time);
1284 /* release one export reference anyway */
1285 rc = obd_disconnect(exp);
1287 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1288 obd_export_nid2str(exp), exp, rc);
1289 class_export_put(exp);
1294 void class_disconnect_exports(struct obd_device *obd)
1296 cfs_list_t work_list;
1299 /* Move all of the exports from obd_exports to a work list, en masse. */
1300 CFS_INIT_LIST_HEAD(&work_list);
1301 spin_lock(&obd->obd_dev_lock);
1302 cfs_list_splice_init(&obd->obd_exports, &work_list);
1303 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1304 spin_unlock(&obd->obd_dev_lock);
1306 if (!cfs_list_empty(&work_list)) {
1307 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1308 "disconnecting them\n", obd->obd_minor, obd);
1309 class_disconnect_export_list(&work_list,
1310 exp_flags_from_obd(obd));
1312 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1313 obd->obd_minor, obd);
1316 EXPORT_SYMBOL(class_disconnect_exports);
1318 /* Remove exports that have not completed recovery.
1320 void class_disconnect_stale_exports(struct obd_device *obd,
1321 int (*test_export)(struct obd_export *))
1323 cfs_list_t work_list;
1324 struct obd_export *exp, *n;
1328 CFS_INIT_LIST_HEAD(&work_list);
1329 spin_lock(&obd->obd_dev_lock);
1330 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1332 /* don't count self-export as client */
1333 if (obd_uuid_equals(&exp->exp_client_uuid,
1334 &exp->exp_obd->obd_uuid))
1337 /* don't evict clients which have no slot in last_rcvd
1338 * (e.g. lightweight connection) */
1339 if (exp->exp_target_data.ted_lr_idx == -1)
1342 spin_lock(&exp->exp_lock);
1343 if (exp->exp_failed || test_export(exp)) {
1344 spin_unlock(&exp->exp_lock);
1347 exp->exp_failed = 1;
1348 spin_unlock(&exp->exp_lock);
1350 cfs_list_move(&exp->exp_obd_chain, &work_list);
1352 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1353 obd->obd_name, exp->exp_client_uuid.uuid,
1354 exp->exp_connection == NULL ? "<unknown>" :
1355 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1356 print_export_data(exp, "EVICTING", 0);
1358 spin_unlock(&obd->obd_dev_lock);
1361 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1362 obd->obd_name, evicted);
1364 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1365 OBD_OPT_ABORT_RECOV);
1368 EXPORT_SYMBOL(class_disconnect_stale_exports);
1370 void class_fail_export(struct obd_export *exp)
1372 int rc, already_failed;
1374 spin_lock(&exp->exp_lock);
1375 already_failed = exp->exp_failed;
1376 exp->exp_failed = 1;
1377 spin_unlock(&exp->exp_lock);
1379 if (already_failed) {
1380 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1381 exp, exp->exp_client_uuid.uuid);
1385 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1386 exp, exp->exp_client_uuid.uuid);
1388 if (obd_dump_on_timeout)
1389 libcfs_debug_dumplog();
1391 /* need for safe call CDEBUG after obd_disconnect */
1392 class_export_get(exp);
1394 /* Most callers into obd_disconnect are removing their own reference
1395 * (request, for example) in addition to the one from the hash table.
1396 * We don't have such a reference here, so make one. */
1397 class_export_get(exp);
1398 rc = obd_disconnect(exp);
1400 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1402 CDEBUG(D_HA, "disconnected export %p/%s\n",
1403 exp, exp->exp_client_uuid.uuid);
1404 class_export_put(exp);
1406 EXPORT_SYMBOL(class_fail_export);
1408 char *obd_export_nid2str(struct obd_export *exp)
1410 if (exp->exp_connection != NULL)
1411 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1415 EXPORT_SYMBOL(obd_export_nid2str);
1417 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1419 cfs_hash_t *nid_hash;
1420 struct obd_export *doomed_exp = NULL;
1421 int exports_evicted = 0;
1423 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1425 spin_lock(&obd->obd_dev_lock);
1426 /* umount has run already, so evict thread should leave
1427 * its task to umount thread now */
1428 if (obd->obd_stopping) {
1429 spin_unlock(&obd->obd_dev_lock);
1430 return exports_evicted;
1432 nid_hash = obd->obd_nid_hash;
1433 cfs_hash_getref(nid_hash);
1434 spin_unlock(&obd->obd_dev_lock);
1437 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1438 if (doomed_exp == NULL)
1441 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1442 "nid %s found, wanted nid %s, requested nid %s\n",
1443 obd_export_nid2str(doomed_exp),
1444 libcfs_nid2str(nid_key), nid);
1445 LASSERTF(doomed_exp != obd->obd_self_export,
1446 "self-export is hashed by NID?\n");
1448 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1449 "request\n", obd->obd_name,
1450 obd_uuid2str(&doomed_exp->exp_client_uuid),
1451 obd_export_nid2str(doomed_exp));
1452 class_fail_export(doomed_exp);
1453 class_export_put(doomed_exp);
1456 cfs_hash_putref(nid_hash);
1458 if (!exports_evicted)
1459 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1460 obd->obd_name, nid);
1461 return exports_evicted;
1463 EXPORT_SYMBOL(obd_export_evict_by_nid);
1465 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1467 cfs_hash_t *uuid_hash;
1468 struct obd_export *doomed_exp = NULL;
1469 struct obd_uuid doomed_uuid;
1470 int exports_evicted = 0;
1472 spin_lock(&obd->obd_dev_lock);
1473 if (obd->obd_stopping) {
1474 spin_unlock(&obd->obd_dev_lock);
1475 return exports_evicted;
1477 uuid_hash = obd->obd_uuid_hash;
1478 cfs_hash_getref(uuid_hash);
1479 spin_unlock(&obd->obd_dev_lock);
1481 obd_str2uuid(&doomed_uuid, uuid);
1482 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1483 CERROR("%s: can't evict myself\n", obd->obd_name);
1484 cfs_hash_putref(uuid_hash);
1485 return exports_evicted;
1488 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1490 if (doomed_exp == NULL) {
1491 CERROR("%s: can't disconnect %s: no exports found\n",
1492 obd->obd_name, uuid);
1494 CWARN("%s: evicting %s at adminstrative request\n",
1495 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1496 class_fail_export(doomed_exp);
1497 class_export_put(doomed_exp);
1500 cfs_hash_putref(uuid_hash);
1502 return exports_evicted;
1504 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1506 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1507 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1508 EXPORT_SYMBOL(class_export_dump_hook);
1511 static void print_export_data(struct obd_export *exp, const char *status,
1514 struct ptlrpc_reply_state *rs;
1515 struct ptlrpc_reply_state *first_reply = NULL;
1518 spin_lock(&exp->exp_lock);
1519 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1525 spin_unlock(&exp->exp_lock);
1527 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1528 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1529 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1530 cfs_atomic_read(&exp->exp_rpc_count),
1531 cfs_atomic_read(&exp->exp_cb_count),
1532 cfs_atomic_read(&exp->exp_locks_count),
1533 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1534 nreplies, first_reply, nreplies > 3 ? "..." : "",
1535 exp->exp_last_committed);
1536 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1537 if (locks && class_export_dump_hook != NULL)
1538 class_export_dump_hook(exp);
1542 void dump_exports(struct obd_device *obd, int locks)
1544 struct obd_export *exp;
1546 spin_lock(&obd->obd_dev_lock);
1547 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1548 print_export_data(exp, "ACTIVE", locks);
1549 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1550 print_export_data(exp, "UNLINKED", locks);
1551 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1552 print_export_data(exp, "DELAYED", locks);
1553 spin_unlock(&obd->obd_dev_lock);
1554 spin_lock(&obd_zombie_impexp_lock);
1555 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1556 print_export_data(exp, "ZOMBIE", locks);
1557 spin_unlock(&obd_zombie_impexp_lock);
1559 EXPORT_SYMBOL(dump_exports);
1561 void obd_exports_barrier(struct obd_device *obd)
1564 LASSERT(cfs_list_empty(&obd->obd_exports));
1565 spin_lock(&obd->obd_dev_lock);
1566 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1567 spin_unlock(&obd->obd_dev_lock);
1568 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1569 cfs_time_seconds(waited));
1570 if (waited > 5 && IS_PO2(waited)) {
1571 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1572 "more than %d seconds. "
1573 "The obd refcount = %d. Is it stuck?\n",
1574 obd->obd_name, waited,
1575 cfs_atomic_read(&obd->obd_refcount));
1576 dump_exports(obd, 1);
1579 spin_lock(&obd->obd_dev_lock);
1581 spin_unlock(&obd->obd_dev_lock);
1583 EXPORT_SYMBOL(obd_exports_barrier);
1585 /* Total amount of zombies to be destroyed */
1586 static int zombies_count = 0;
1589 * kill zombie imports and exports
1591 void obd_zombie_impexp_cull(void)
1593 struct obd_import *import;
1594 struct obd_export *export;
1598 spin_lock(&obd_zombie_impexp_lock);
1601 if (!cfs_list_empty(&obd_zombie_imports)) {
1602 import = cfs_list_entry(obd_zombie_imports.next,
1605 cfs_list_del_init(&import->imp_zombie_chain);
1609 if (!cfs_list_empty(&obd_zombie_exports)) {
1610 export = cfs_list_entry(obd_zombie_exports.next,
1613 cfs_list_del_init(&export->exp_obd_chain);
1616 spin_unlock(&obd_zombie_impexp_lock);
1618 if (import != NULL) {
1619 class_import_destroy(import);
1620 spin_lock(&obd_zombie_impexp_lock);
1622 spin_unlock(&obd_zombie_impexp_lock);
1625 if (export != NULL) {
1626 class_export_destroy(export);
1627 spin_lock(&obd_zombie_impexp_lock);
1629 spin_unlock(&obd_zombie_impexp_lock);
1633 } while (import != NULL || export != NULL);
1637 static struct completion obd_zombie_start;
1638 static struct completion obd_zombie_stop;
1639 static unsigned long obd_zombie_flags;
1640 static cfs_waitq_t obd_zombie_waitq;
1641 static pid_t obd_zombie_pid;
1644 OBD_ZOMBIE_STOP = 0x0001,
1648 * check for work for kill zombie import/export thread.
1650 static int obd_zombie_impexp_check(void *arg)
1654 spin_lock(&obd_zombie_impexp_lock);
1655 rc = (zombies_count == 0) &&
1656 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1657 spin_unlock(&obd_zombie_impexp_lock);
1663 * Add export to the obd_zombe thread and notify it.
1665 static void obd_zombie_export_add(struct obd_export *exp) {
1666 spin_lock(&exp->exp_obd->obd_dev_lock);
1667 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1668 cfs_list_del_init(&exp->exp_obd_chain);
1669 spin_unlock(&exp->exp_obd->obd_dev_lock);
1670 spin_lock(&obd_zombie_impexp_lock);
1672 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1673 spin_unlock(&obd_zombie_impexp_lock);
1675 obd_zombie_impexp_notify();
1679 * Add import to the obd_zombe thread and notify it.
1681 static void obd_zombie_import_add(struct obd_import *imp) {
1682 LASSERT(imp->imp_sec == NULL);
1683 LASSERT(imp->imp_rq_pool == NULL);
1684 spin_lock(&obd_zombie_impexp_lock);
1685 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1687 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1688 spin_unlock(&obd_zombie_impexp_lock);
1690 obd_zombie_impexp_notify();
1694 * notify import/export destroy thread about new zombie.
1696 static void obd_zombie_impexp_notify(void)
1699 * Make sure obd_zomebie_impexp_thread get this notification.
1700 * It is possible this signal only get by obd_zombie_barrier, and
1701 * barrier gulps this notification and sleeps away and hangs ensues
1703 cfs_waitq_broadcast(&obd_zombie_waitq);
1707 * check whether obd_zombie is idle
1709 static int obd_zombie_is_idle(void)
1713 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1714 spin_lock(&obd_zombie_impexp_lock);
1715 rc = (zombies_count == 0);
1716 spin_unlock(&obd_zombie_impexp_lock);
1721 * wait when obd_zombie import/export queues become empty
1723 void obd_zombie_barrier(void)
1725 struct l_wait_info lwi = { 0 };
1727 if (obd_zombie_pid == cfs_curproc_pid())
1728 /* don't wait for myself */
1730 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1732 EXPORT_SYMBOL(obd_zombie_barrier);
1737 * destroy zombie export/import thread.
1739 static int obd_zombie_impexp_thread(void *unused)
1741 unshare_fs_struct();
1742 complete(&obd_zombie_start);
1744 obd_zombie_pid = cfs_curproc_pid();
1746 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1747 struct l_wait_info lwi = { 0 };
1749 l_wait_event(obd_zombie_waitq,
1750 !obd_zombie_impexp_check(NULL), &lwi);
1751 obd_zombie_impexp_cull();
1754 * Notify obd_zombie_barrier callers that queues
1757 cfs_waitq_signal(&obd_zombie_waitq);
1760 complete(&obd_zombie_stop);
1765 #else /* ! KERNEL */
1767 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1768 static void *obd_zombie_impexp_work_cb;
1769 static void *obd_zombie_impexp_idle_cb;
1771 int obd_zombie_impexp_kill(void *arg)
1775 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1776 obd_zombie_impexp_cull();
1779 cfs_atomic_dec(&zombie_recur);
1786 * start destroy zombie import/export thread
1788 int obd_zombie_impexp_init(void)
1794 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1795 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1796 spin_lock_init(&obd_zombie_impexp_lock);
1797 init_completion(&obd_zombie_start);
1798 init_completion(&obd_zombie_stop);
1799 cfs_waitq_init(&obd_zombie_waitq);
1803 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1805 RETURN(PTR_ERR(task));
1807 wait_for_completion(&obd_zombie_start);
1810 obd_zombie_impexp_work_cb =
1811 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1812 &obd_zombie_impexp_kill, NULL);
1814 obd_zombie_impexp_idle_cb =
1815 liblustre_register_idle_callback("obd_zombi_impexp_check",
1816 &obd_zombie_impexp_check, NULL);
1821 * stop destroy zombie import/export thread
1823 void obd_zombie_impexp_stop(void)
1825 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1826 obd_zombie_impexp_notify();
1828 wait_for_completion(&obd_zombie_stop);
1830 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1831 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1835 /***** Kernel-userspace comm helpers *******/
1837 /* Get length of entire message, including header */
1838 int kuc_len(int payload_len)
1840 return sizeof(struct kuc_hdr) + payload_len;
1842 EXPORT_SYMBOL(kuc_len);
1844 /* Get a pointer to kuc header, given a ptr to the payload
1845 * @param p Pointer to payload area
1846 * @returns Pointer to kuc header
1848 struct kuc_hdr * kuc_ptr(void *p)
1850 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1851 LASSERT(lh->kuc_magic == KUC_MAGIC);
1854 EXPORT_SYMBOL(kuc_ptr);
1856 /* Test if payload is part of kuc message
1857 * @param p Pointer to payload area
1860 int kuc_ispayload(void *p)
1862 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1864 if (kh->kuc_magic == KUC_MAGIC)
1869 EXPORT_SYMBOL(kuc_ispayload);
1871 /* Alloc space for a message, and fill in header
1872 * @return Pointer to payload area
1874 void *kuc_alloc(int payload_len, int transport, int type)
1877 int len = kuc_len(payload_len);
1881 return ERR_PTR(-ENOMEM);
1883 lh->kuc_magic = KUC_MAGIC;
1884 lh->kuc_transport = transport;
1885 lh->kuc_msgtype = type;
1886 lh->kuc_msglen = len;
1888 return (void *)(lh + 1);
1890 EXPORT_SYMBOL(kuc_alloc);
1892 /* Takes pointer to payload area */
1893 inline void kuc_free(void *p, int payload_len)
1895 struct kuc_hdr *lh = kuc_ptr(p);
1896 OBD_FREE(lh, kuc_len(payload_len));
1898 EXPORT_SYMBOL(kuc_free);