4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2013, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
53 struct kmem_cache *obd_device_cachep;
54 struct kmem_cache *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 struct kmem_cache *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, __GFP_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
129 modname = LUSTRE_OSP_NAME;
131 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
132 modname = LUSTRE_MDT_NAME;
134 if (!request_module("%s", modname)) {
135 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
136 type = class_search_type(name);
138 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
144 spin_lock(&type->obd_type_lock);
146 try_module_get(type->typ_dt_ops->o_owner);
147 spin_unlock(&type->obd_type_lock);
151 EXPORT_SYMBOL(class_get_type);
153 void class_put_type(struct obd_type *type)
156 spin_lock(&type->obd_type_lock);
158 module_put(type->typ_dt_ops->o_owner);
159 spin_unlock(&type->obd_type_lock);
161 EXPORT_SYMBOL(class_put_type);
163 #define CLASS_MAX_NAME 1024
165 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
166 struct lprocfs_seq_vars *module_vars,
167 #ifndef HAVE_ONLY_PROCFS_SEQ
168 struct lprocfs_vars *vars,
170 const char *name, struct lu_device_type *ldt)
172 struct obd_type *type;
177 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
179 if (class_search_type(name)) {
180 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
185 OBD_ALLOC(type, sizeof(*type));
189 OBD_ALLOC_PTR(type->typ_dt_ops);
190 OBD_ALLOC_PTR(type->typ_md_ops);
191 OBD_ALLOC(type->typ_name, strlen(name) + 1);
193 if (type->typ_dt_ops == NULL ||
194 type->typ_md_ops == NULL ||
195 type->typ_name == NULL)
198 *(type->typ_dt_ops) = *dt_ops;
199 /* md_ops is optional */
201 *(type->typ_md_ops) = *md_ops;
202 strcpy(type->typ_name, name);
203 spin_lock_init(&type->obd_type_lock);
206 #ifndef HAVE_ONLY_PROCFS_SEQ
208 type->typ_procroot = lprocfs_register(type->typ_name,
214 type->typ_procroot = lprocfs_seq_register(type->typ_name,
218 if (IS_ERR(type->typ_procroot)) {
219 rc = PTR_ERR(type->typ_procroot);
220 type->typ_procroot = NULL;
226 rc = lu_device_type_init(ldt);
231 spin_lock(&obd_types_lock);
232 cfs_list_add(&type->typ_chain, &obd_types);
233 spin_unlock(&obd_types_lock);
238 if (type->typ_name != NULL)
239 OBD_FREE(type->typ_name, strlen(name) + 1);
240 if (type->typ_md_ops != NULL)
241 OBD_FREE_PTR(type->typ_md_ops);
242 if (type->typ_dt_ops != NULL)
243 OBD_FREE_PTR(type->typ_dt_ops);
245 #ifndef HAVE_ONLY_PROCFS_SEQ
246 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
248 remove_proc_subtree(type->typ_name, proc_lustre_root);
251 OBD_FREE(type, sizeof(*type));
254 EXPORT_SYMBOL(class_register_type);
256 int class_unregister_type(const char *name)
258 struct obd_type *type = class_search_type(name);
262 CERROR("unknown obd type\n");
266 if (type->typ_refcnt) {
267 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
268 /* This is a bad situation, let's make the best of it */
269 /* Remove ops, but leave the name for debugging */
270 OBD_FREE_PTR(type->typ_dt_ops);
271 OBD_FREE_PTR(type->typ_md_ops);
275 /* we do not use type->typ_procroot as for compatibility purposes
276 * other modules can share names (i.e. lod can use lov entry). so
277 * we can't reference pointer as it can get invalided when another
278 * module removes the entry */
280 #ifndef HAVE_ONLY_PROCFS_SEQ
281 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
283 remove_proc_subtree(type->typ_name, proc_lustre_root);
287 lu_device_type_fini(type->typ_lu);
289 spin_lock(&obd_types_lock);
290 cfs_list_del(&type->typ_chain);
291 spin_unlock(&obd_types_lock);
292 OBD_FREE(type->typ_name, strlen(name) + 1);
293 if (type->typ_dt_ops != NULL)
294 OBD_FREE_PTR(type->typ_dt_ops);
295 if (type->typ_md_ops != NULL)
296 OBD_FREE_PTR(type->typ_md_ops);
297 OBD_FREE(type, sizeof(*type));
299 } /* class_unregister_type */
300 EXPORT_SYMBOL(class_unregister_type);
303 * Create a new obd device.
305 * Find an empty slot in ::obd_devs[], create a new obd device in it.
307 * \param[in] type_name obd device type string.
308 * \param[in] name obd device name.
310 * \retval NULL if create fails, otherwise return the obd device
313 struct obd_device *class_newdev(const char *type_name, const char *name)
315 struct obd_device *result = NULL;
316 struct obd_device *newdev;
317 struct obd_type *type = NULL;
319 int new_obd_minor = 0;
322 if (strlen(name) >= MAX_OBD_NAME) {
323 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
324 RETURN(ERR_PTR(-EINVAL));
327 type = class_get_type(type_name);
329 CERROR("OBD: unknown type: %s\n", type_name);
330 RETURN(ERR_PTR(-ENODEV));
333 newdev = obd_device_alloc();
335 GOTO(out_type, result = ERR_PTR(-ENOMEM));
337 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
339 write_lock(&obd_dev_lock);
340 for (i = 0; i < class_devno_max(); i++) {
341 struct obd_device *obd = class_num2obd(i);
343 if (obd && (strcmp(name, obd->obd_name) == 0)) {
344 CERROR("Device %s already exists at %d, won't add\n",
347 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
348 "%p obd_magic %08x != %08x\n", result,
349 result->obd_magic, OBD_DEVICE_MAGIC);
350 LASSERTF(result->obd_minor == new_obd_minor,
351 "%p obd_minor %d != %d\n", result,
352 result->obd_minor, new_obd_minor);
354 obd_devs[result->obd_minor] = NULL;
355 result->obd_name[0]='\0';
357 result = ERR_PTR(-EEXIST);
360 if (!result && !obd) {
362 result->obd_minor = i;
364 result->obd_type = type;
365 strncpy(result->obd_name, name,
366 sizeof(result->obd_name) - 1);
367 obd_devs[i] = result;
370 write_unlock(&obd_dev_lock);
372 if (result == NULL && i >= class_devno_max()) {
373 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
375 GOTO(out, result = ERR_PTR(-EOVERFLOW));
381 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
382 result->obd_name, result);
386 obd_device_free(newdev);
388 class_put_type(type);
392 void class_release_dev(struct obd_device *obd)
394 struct obd_type *obd_type = obd->obd_type;
396 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
397 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
398 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
399 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
400 LASSERT(obd_type != NULL);
402 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
403 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
405 write_lock(&obd_dev_lock);
406 obd_devs[obd->obd_minor] = NULL;
407 write_unlock(&obd_dev_lock);
408 obd_device_free(obd);
410 class_put_type(obd_type);
413 int class_name2dev(const char *name)
420 read_lock(&obd_dev_lock);
421 for (i = 0; i < class_devno_max(); i++) {
422 struct obd_device *obd = class_num2obd(i);
424 if (obd && strcmp(name, obd->obd_name) == 0) {
425 /* Make sure we finished attaching before we give
426 out any references */
427 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
428 if (obd->obd_attached) {
429 read_unlock(&obd_dev_lock);
435 read_unlock(&obd_dev_lock);
439 EXPORT_SYMBOL(class_name2dev);
441 struct obd_device *class_name2obd(const char *name)
443 int dev = class_name2dev(name);
445 if (dev < 0 || dev > class_devno_max())
447 return class_num2obd(dev);
449 EXPORT_SYMBOL(class_name2obd);
451 int class_uuid2dev(struct obd_uuid *uuid)
455 read_lock(&obd_dev_lock);
456 for (i = 0; i < class_devno_max(); i++) {
457 struct obd_device *obd = class_num2obd(i);
459 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
460 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
461 read_unlock(&obd_dev_lock);
465 read_unlock(&obd_dev_lock);
469 EXPORT_SYMBOL(class_uuid2dev);
471 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
473 int dev = class_uuid2dev(uuid);
476 return class_num2obd(dev);
478 EXPORT_SYMBOL(class_uuid2obd);
481 * Get obd device from ::obd_devs[]
483 * \param num [in] array index
485 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
486 * otherwise return the obd device there.
488 struct obd_device *class_num2obd(int num)
490 struct obd_device *obd = NULL;
492 if (num < class_devno_max()) {
497 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
498 "%p obd_magic %08x != %08x\n",
499 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
500 LASSERTF(obd->obd_minor == num,
501 "%p obd_minor %0d != %0d\n",
502 obd, obd->obd_minor, num);
507 EXPORT_SYMBOL(class_num2obd);
510 * Get obd devices count. Device in any
512 * \retval obd device count
514 int get_devices_count(void)
516 int index, max_index = class_devno_max(), dev_count = 0;
518 read_lock(&obd_dev_lock);
519 for (index = 0; index <= max_index; index++) {
520 struct obd_device *obd = class_num2obd(index);
524 read_unlock(&obd_dev_lock);
528 EXPORT_SYMBOL(get_devices_count);
530 void class_obd_list(void)
535 read_lock(&obd_dev_lock);
536 for (i = 0; i < class_devno_max(); i++) {
537 struct obd_device *obd = class_num2obd(i);
541 if (obd->obd_stopping)
543 else if (obd->obd_set_up)
545 else if (obd->obd_attached)
549 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
550 i, status, obd->obd_type->typ_name,
551 obd->obd_name, obd->obd_uuid.uuid,
552 atomic_read(&obd->obd_refcount));
554 read_unlock(&obd_dev_lock);
558 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
559 specified, then only the client with that uuid is returned,
560 otherwise any client connected to the tgt is returned. */
561 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
562 const char * typ_name,
563 struct obd_uuid *grp_uuid)
567 read_lock(&obd_dev_lock);
568 for (i = 0; i < class_devno_max(); i++) {
569 struct obd_device *obd = class_num2obd(i);
573 if ((strncmp(obd->obd_type->typ_name, typ_name,
574 strlen(typ_name)) == 0)) {
575 if (obd_uuid_equals(tgt_uuid,
576 &obd->u.cli.cl_target_uuid) &&
577 ((grp_uuid)? obd_uuid_equals(grp_uuid,
578 &obd->obd_uuid) : 1)) {
579 read_unlock(&obd_dev_lock);
584 read_unlock(&obd_dev_lock);
588 EXPORT_SYMBOL(class_find_client_obd);
590 /* Iterate the obd_device list looking devices have grp_uuid. Start
591 searching at *next, and if a device is found, the next index to look
592 at is saved in *next. If next is NULL, then the first matching device
593 will always be returned. */
594 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
600 else if (*next >= 0 && *next < class_devno_max())
605 read_lock(&obd_dev_lock);
606 for (; i < class_devno_max(); i++) {
607 struct obd_device *obd = class_num2obd(i);
611 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
614 read_unlock(&obd_dev_lock);
618 read_unlock(&obd_dev_lock);
622 EXPORT_SYMBOL(class_devices_in_group);
625 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
626 * adjust sptlrpc settings accordingly.
628 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
630 struct obd_device *obd;
634 LASSERT(namelen > 0);
636 read_lock(&obd_dev_lock);
637 for (i = 0; i < class_devno_max(); i++) {
638 obd = class_num2obd(i);
640 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
643 /* only notify mdc, osc, mdt, ost */
644 type = obd->obd_type->typ_name;
645 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
646 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
647 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
648 strcmp(type, LUSTRE_OST_NAME) != 0)
651 if (strncmp(obd->obd_name, fsname, namelen))
654 class_incref(obd, __FUNCTION__, obd);
655 read_unlock(&obd_dev_lock);
656 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
657 sizeof(KEY_SPTLRPC_CONF),
658 KEY_SPTLRPC_CONF, 0, NULL, NULL);
660 class_decref(obd, __FUNCTION__, obd);
661 read_lock(&obd_dev_lock);
663 read_unlock(&obd_dev_lock);
666 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
668 void obd_cleanup_caches(void)
671 if (obd_device_cachep) {
672 kmem_cache_destroy(obd_device_cachep);
673 obd_device_cachep = NULL;
676 kmem_cache_destroy(obdo_cachep);
680 kmem_cache_destroy(import_cachep);
681 import_cachep = NULL;
684 kmem_cache_destroy(capa_cachep);
690 int obd_init_caches(void)
695 LASSERT(obd_device_cachep == NULL);
696 obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
697 sizeof(struct obd_device),
699 if (!obd_device_cachep)
700 GOTO(out, rc = -ENOMEM);
702 LASSERT(obdo_cachep == NULL);
703 obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
706 GOTO(out, rc = -ENOMEM);
708 LASSERT(import_cachep == NULL);
709 import_cachep = kmem_cache_create("ll_import_cache",
710 sizeof(struct obd_import),
713 GOTO(out, rc = -ENOMEM);
715 LASSERT(capa_cachep == NULL);
716 capa_cachep = kmem_cache_create("capa_cache", sizeof(struct obd_capa),
719 GOTO(out, rc = -ENOMEM);
723 obd_cleanup_caches();
727 /* map connection to client */
728 struct obd_export *class_conn2export(struct lustre_handle *conn)
730 struct obd_export *export;
734 CDEBUG(D_CACHE, "looking for null handle\n");
738 if (conn->cookie == -1) { /* this means assign a new connection */
739 CDEBUG(D_CACHE, "want a new connection\n");
743 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
744 export = class_handle2object(conn->cookie, NULL);
747 EXPORT_SYMBOL(class_conn2export);
749 struct obd_device *class_exp2obd(struct obd_export *exp)
755 EXPORT_SYMBOL(class_exp2obd);
757 struct obd_device *class_conn2obd(struct lustre_handle *conn)
759 struct obd_export *export;
760 export = class_conn2export(conn);
762 struct obd_device *obd = export->exp_obd;
763 class_export_put(export);
768 EXPORT_SYMBOL(class_conn2obd);
770 struct obd_import *class_exp2cliimp(struct obd_export *exp)
772 struct obd_device *obd = exp->exp_obd;
775 return obd->u.cli.cl_import;
777 EXPORT_SYMBOL(class_exp2cliimp);
779 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
781 struct obd_device *obd = class_conn2obd(conn);
784 return obd->u.cli.cl_import;
786 EXPORT_SYMBOL(class_conn2cliimp);
788 /* Export management functions */
789 static void class_export_destroy(struct obd_export *exp)
791 struct obd_device *obd = exp->exp_obd;
794 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
795 LASSERT(obd != NULL);
797 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
798 exp->exp_client_uuid.uuid, obd->obd_name);
800 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
801 if (exp->exp_connection)
802 ptlrpc_put_connection_superhack(exp->exp_connection);
804 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
805 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
806 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
807 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
808 obd_destroy_export(exp);
809 class_decref(obd, "export", exp);
811 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
815 static void export_handle_addref(void *export)
817 class_export_get(export);
820 static struct portals_handle_ops export_handle_ops = {
821 .hop_addref = export_handle_addref,
825 struct obd_export *class_export_get(struct obd_export *exp)
827 atomic_inc(&exp->exp_refcount);
828 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
829 atomic_read(&exp->exp_refcount));
832 EXPORT_SYMBOL(class_export_get);
834 void class_export_put(struct obd_export *exp)
836 LASSERT(exp != NULL);
837 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
838 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
839 atomic_read(&exp->exp_refcount) - 1);
841 if (atomic_dec_and_test(&exp->exp_refcount)) {
842 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
843 CDEBUG(D_IOCTL, "final put %p/%s\n",
844 exp, exp->exp_client_uuid.uuid);
846 /* release nid stat refererence */
847 lprocfs_exp_cleanup(exp);
849 obd_zombie_export_add(exp);
852 EXPORT_SYMBOL(class_export_put);
854 /* Creates a new export, adds it to the hash table, and returns a
855 * pointer to it. The refcount is 2: one for the hash reference, and
856 * one for the pointer returned by this function. */
857 struct obd_export *class_new_export(struct obd_device *obd,
858 struct obd_uuid *cluuid)
860 struct obd_export *export;
861 cfs_hash_t *hash = NULL;
865 OBD_ALLOC_PTR(export);
867 return ERR_PTR(-ENOMEM);
869 export->exp_conn_cnt = 0;
870 export->exp_lock_hash = NULL;
871 export->exp_flock_hash = NULL;
872 atomic_set(&export->exp_refcount, 2);
873 atomic_set(&export->exp_rpc_count, 0);
874 atomic_set(&export->exp_cb_count, 0);
875 atomic_set(&export->exp_locks_count, 0);
876 #if LUSTRE_TRACKS_LOCK_EXP_REFS
877 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
878 spin_lock_init(&export->exp_locks_list_guard);
880 atomic_set(&export->exp_replay_count, 0);
881 export->exp_obd = obd;
882 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
883 spin_lock_init(&export->exp_uncommitted_replies_lock);
884 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
885 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
886 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
887 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
888 CFS_INIT_LIST_HEAD(&export->exp_reg_rpcs);
889 class_handle_hash(&export->exp_handle, &export_handle_ops);
890 export->exp_last_request_time = cfs_time_current_sec();
891 spin_lock_init(&export->exp_lock);
892 spin_lock_init(&export->exp_rpc_lock);
893 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
894 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
895 spin_lock_init(&export->exp_bl_list_lock);
896 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
898 export->exp_sp_peer = LUSTRE_SP_ANY;
899 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
900 export->exp_client_uuid = *cluuid;
901 obd_init_export(export);
903 spin_lock(&obd->obd_dev_lock);
904 /* shouldn't happen, but might race */
905 if (obd->obd_stopping)
906 GOTO(exit_unlock, rc = -ENODEV);
908 hash = cfs_hash_getref(obd->obd_uuid_hash);
910 GOTO(exit_unlock, rc = -ENODEV);
911 spin_unlock(&obd->obd_dev_lock);
913 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
914 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
916 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
917 obd->obd_name, cluuid->uuid, rc);
918 GOTO(exit_err, rc = -EALREADY);
922 spin_lock(&obd->obd_dev_lock);
923 if (obd->obd_stopping) {
924 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
925 GOTO(exit_unlock, rc = -ENODEV);
928 class_incref(obd, "export", export);
929 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
930 cfs_list_add_tail(&export->exp_obd_chain_timed,
931 &export->exp_obd->obd_exports_timed);
932 export->exp_obd->obd_num_exports++;
933 spin_unlock(&obd->obd_dev_lock);
934 cfs_hash_putref(hash);
938 spin_unlock(&obd->obd_dev_lock);
941 cfs_hash_putref(hash);
942 class_handle_unhash(&export->exp_handle);
943 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
944 obd_destroy_export(export);
945 OBD_FREE_PTR(export);
948 EXPORT_SYMBOL(class_new_export);
950 void class_unlink_export(struct obd_export *exp)
952 class_handle_unhash(&exp->exp_handle);
954 spin_lock(&exp->exp_obd->obd_dev_lock);
955 /* delete an uuid-export hashitem from hashtables */
956 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
957 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
958 &exp->exp_client_uuid,
959 &exp->exp_uuid_hash);
961 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
962 cfs_list_del_init(&exp->exp_obd_chain_timed);
963 exp->exp_obd->obd_num_exports--;
964 spin_unlock(&exp->exp_obd->obd_dev_lock);
965 class_export_put(exp);
967 EXPORT_SYMBOL(class_unlink_export);
969 /* Import management functions */
970 void class_import_destroy(struct obd_import *imp)
974 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
975 imp->imp_obd->obd_name);
977 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
979 ptlrpc_put_connection_superhack(imp->imp_connection);
981 while (!cfs_list_empty(&imp->imp_conn_list)) {
982 struct obd_import_conn *imp_conn;
984 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
985 struct obd_import_conn, oic_item);
986 cfs_list_del_init(&imp_conn->oic_item);
987 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
988 OBD_FREE(imp_conn, sizeof(*imp_conn));
991 LASSERT(imp->imp_sec == NULL);
992 class_decref(imp->imp_obd, "import", imp);
993 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
997 static void import_handle_addref(void *import)
999 class_import_get(import);
1002 static struct portals_handle_ops import_handle_ops = {
1003 .hop_addref = import_handle_addref,
1007 struct obd_import *class_import_get(struct obd_import *import)
1009 atomic_inc(&import->imp_refcount);
1010 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1011 atomic_read(&import->imp_refcount),
1012 import->imp_obd->obd_name);
1015 EXPORT_SYMBOL(class_import_get);
1017 void class_import_put(struct obd_import *imp)
1021 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1022 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1024 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1025 atomic_read(&imp->imp_refcount) - 1,
1026 imp->imp_obd->obd_name);
1028 if (atomic_dec_and_test(&imp->imp_refcount)) {
1029 CDEBUG(D_INFO, "final put import %p\n", imp);
1030 obd_zombie_import_add(imp);
1033 /* catch possible import put race */
1034 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1037 EXPORT_SYMBOL(class_import_put);
1039 static void init_imp_at(struct imp_at *at) {
1041 at_init(&at->iat_net_latency, 0, 0);
1042 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1043 /* max service estimates are tracked on the server side, so
1044 don't use the AT history here, just use the last reported
1045 val. (But keep hist for proc histogram, worst_ever) */
1046 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1051 struct obd_import *class_new_import(struct obd_device *obd)
1053 struct obd_import *imp;
1055 OBD_ALLOC(imp, sizeof(*imp));
1059 CFS_INIT_LIST_HEAD(&imp->imp_pinger_chain);
1060 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1061 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1062 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1063 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1064 CFS_INIT_LIST_HEAD(&imp->imp_committed_list);
1065 imp->imp_replay_cursor = &imp->imp_committed_list;
1066 spin_lock_init(&imp->imp_lock);
1067 imp->imp_last_success_conn = 0;
1068 imp->imp_state = LUSTRE_IMP_NEW;
1069 imp->imp_obd = class_incref(obd, "import", imp);
1070 mutex_init(&imp->imp_sec_mutex);
1071 init_waitqueue_head(&imp->imp_recovery_waitq);
1073 atomic_set(&imp->imp_refcount, 2);
1074 atomic_set(&imp->imp_unregistering, 0);
1075 atomic_set(&imp->imp_inflight, 0);
1076 atomic_set(&imp->imp_replay_inflight, 0);
1077 atomic_set(&imp->imp_inval_count, 0);
1078 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1079 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1080 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1081 init_imp_at(&imp->imp_at);
1083 /* the default magic is V2, will be used in connect RPC, and
1084 * then adjusted according to the flags in request/reply. */
1085 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1089 EXPORT_SYMBOL(class_new_import);
1091 void class_destroy_import(struct obd_import *import)
1093 LASSERT(import != NULL);
1094 LASSERT(import != LP_POISON);
1096 class_handle_unhash(&import->imp_handle);
1098 spin_lock(&import->imp_lock);
1099 import->imp_generation++;
1100 spin_unlock(&import->imp_lock);
1101 class_import_put(import);
1103 EXPORT_SYMBOL(class_destroy_import);
1105 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1107 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1109 spin_lock(&exp->exp_locks_list_guard);
1111 LASSERT(lock->l_exp_refs_nr >= 0);
1113 if (lock->l_exp_refs_target != NULL &&
1114 lock->l_exp_refs_target != exp) {
1115 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1116 exp, lock, lock->l_exp_refs_target);
1118 if ((lock->l_exp_refs_nr ++) == 0) {
1119 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1120 lock->l_exp_refs_target = exp;
1122 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1123 lock, exp, lock->l_exp_refs_nr);
1124 spin_unlock(&exp->exp_locks_list_guard);
1126 EXPORT_SYMBOL(__class_export_add_lock_ref);
1128 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1130 spin_lock(&exp->exp_locks_list_guard);
1131 LASSERT(lock->l_exp_refs_nr > 0);
1132 if (lock->l_exp_refs_target != exp) {
1133 LCONSOLE_WARN("lock %p, "
1134 "mismatching export pointers: %p, %p\n",
1135 lock, lock->l_exp_refs_target, exp);
1137 if (-- lock->l_exp_refs_nr == 0) {
1138 cfs_list_del_init(&lock->l_exp_refs_link);
1139 lock->l_exp_refs_target = NULL;
1141 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1142 lock, exp, lock->l_exp_refs_nr);
1143 spin_unlock(&exp->exp_locks_list_guard);
1145 EXPORT_SYMBOL(__class_export_del_lock_ref);
1148 /* A connection defines an export context in which preallocation can
1149 be managed. This releases the export pointer reference, and returns
1150 the export handle, so the export refcount is 1 when this function
1152 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1153 struct obd_uuid *cluuid)
1155 struct obd_export *export;
1156 LASSERT(conn != NULL);
1157 LASSERT(obd != NULL);
1158 LASSERT(cluuid != NULL);
1161 export = class_new_export(obd, cluuid);
1163 RETURN(PTR_ERR(export));
1165 conn->cookie = export->exp_handle.h_cookie;
1166 class_export_put(export);
1168 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1169 cluuid->uuid, conn->cookie);
1172 EXPORT_SYMBOL(class_connect);
1174 /* if export is involved in recovery then clean up related things */
1175 void class_export_recovery_cleanup(struct obd_export *exp)
1177 struct obd_device *obd = exp->exp_obd;
1179 spin_lock(&obd->obd_recovery_task_lock);
1180 if (exp->exp_delayed)
1181 obd->obd_delayed_clients--;
1182 if (obd->obd_recovering) {
1183 if (exp->exp_in_recovery) {
1184 spin_lock(&exp->exp_lock);
1185 exp->exp_in_recovery = 0;
1186 spin_unlock(&exp->exp_lock);
1187 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1188 atomic_dec(&obd->obd_connected_clients);
1191 /* if called during recovery then should update
1192 * obd_stale_clients counter,
1193 * lightweight exports are not counted */
1194 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1195 exp->exp_obd->obd_stale_clients++;
1197 spin_unlock(&obd->obd_recovery_task_lock);
1198 /** Cleanup req replay fields */
1199 if (exp->exp_req_replay_needed) {
1200 spin_lock(&exp->exp_lock);
1201 exp->exp_req_replay_needed = 0;
1202 spin_unlock(&exp->exp_lock);
1203 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1204 atomic_dec(&obd->obd_req_replay_clients);
1206 /** Cleanup lock replay data */
1207 if (exp->exp_lock_replay_needed) {
1208 spin_lock(&exp->exp_lock);
1209 exp->exp_lock_replay_needed = 0;
1210 spin_unlock(&exp->exp_lock);
1211 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1212 atomic_dec(&obd->obd_lock_replay_clients);
1216 /* This function removes 1-3 references from the export:
1217 * 1 - for export pointer passed
1218 * and if disconnect really need
1219 * 2 - removing from hash
1220 * 3 - in client_unlink_export
1221 * The export pointer passed to this function can destroyed */
1222 int class_disconnect(struct obd_export *export)
1224 int already_disconnected;
1227 if (export == NULL) {
1228 CWARN("attempting to free NULL export %p\n", export);
1232 spin_lock(&export->exp_lock);
1233 already_disconnected = export->exp_disconnected;
1234 export->exp_disconnected = 1;
1235 spin_unlock(&export->exp_lock);
1237 /* class_cleanup(), abort_recovery(), and class_fail_export()
1238 * all end up in here, and if any of them race we shouldn't
1239 * call extra class_export_puts(). */
1240 if (already_disconnected) {
1241 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1242 GOTO(no_disconn, already_disconnected);
1245 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1246 export->exp_handle.h_cookie);
1248 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1249 cfs_hash_del(export->exp_obd->obd_nid_hash,
1250 &export->exp_connection->c_peer.nid,
1251 &export->exp_nid_hash);
1253 class_export_recovery_cleanup(export);
1254 class_unlink_export(export);
1256 class_export_put(export);
1259 EXPORT_SYMBOL(class_disconnect);
1261 /* Return non-zero for a fully connected export */
1262 int class_connected_export(struct obd_export *exp)
1267 spin_lock(&exp->exp_lock);
1268 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1269 spin_unlock(&exp->exp_lock);
1273 EXPORT_SYMBOL(class_connected_export);
1275 static void class_disconnect_export_list(cfs_list_t *list,
1276 enum obd_option flags)
1279 struct obd_export *exp;
1282 /* It's possible that an export may disconnect itself, but
1283 * nothing else will be added to this list. */
1284 while (!cfs_list_empty(list)) {
1285 exp = cfs_list_entry(list->next, struct obd_export,
1287 /* need for safe call CDEBUG after obd_disconnect */
1288 class_export_get(exp);
1290 spin_lock(&exp->exp_lock);
1291 exp->exp_flags = flags;
1292 spin_unlock(&exp->exp_lock);
1294 if (obd_uuid_equals(&exp->exp_client_uuid,
1295 &exp->exp_obd->obd_uuid)) {
1297 "exp %p export uuid == obd uuid, don't discon\n",
1299 /* Need to delete this now so we don't end up pointing
1300 * to work_list later when this export is cleaned up. */
1301 cfs_list_del_init(&exp->exp_obd_chain);
1302 class_export_put(exp);
1306 class_export_get(exp);
1307 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1308 "last request at "CFS_TIME_T"\n",
1309 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1310 exp, exp->exp_last_request_time);
1311 /* release one export reference anyway */
1312 rc = obd_disconnect(exp);
1314 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1315 obd_export_nid2str(exp), exp, rc);
1316 class_export_put(exp);
1321 void class_disconnect_exports(struct obd_device *obd)
1323 cfs_list_t work_list;
1326 /* Move all of the exports from obd_exports to a work list, en masse. */
1327 CFS_INIT_LIST_HEAD(&work_list);
1328 spin_lock(&obd->obd_dev_lock);
1329 cfs_list_splice_init(&obd->obd_exports, &work_list);
1330 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1331 spin_unlock(&obd->obd_dev_lock);
1333 if (!cfs_list_empty(&work_list)) {
1334 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1335 "disconnecting them\n", obd->obd_minor, obd);
1336 class_disconnect_export_list(&work_list,
1337 exp_flags_from_obd(obd));
1339 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1340 obd->obd_minor, obd);
1343 EXPORT_SYMBOL(class_disconnect_exports);
1345 /* Remove exports that have not completed recovery.
1347 void class_disconnect_stale_exports(struct obd_device *obd,
1348 int (*test_export)(struct obd_export *))
1350 cfs_list_t work_list;
1351 struct obd_export *exp, *n;
1355 CFS_INIT_LIST_HEAD(&work_list);
1356 spin_lock(&obd->obd_dev_lock);
1357 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1359 /* don't count self-export as client */
1360 if (obd_uuid_equals(&exp->exp_client_uuid,
1361 &exp->exp_obd->obd_uuid))
1364 /* don't evict clients which have no slot in last_rcvd
1365 * (e.g. lightweight connection) */
1366 if (exp->exp_target_data.ted_lr_idx == -1)
1369 spin_lock(&exp->exp_lock);
1370 if (exp->exp_failed || test_export(exp)) {
1371 spin_unlock(&exp->exp_lock);
1374 exp->exp_failed = 1;
1375 spin_unlock(&exp->exp_lock);
1377 cfs_list_move(&exp->exp_obd_chain, &work_list);
1379 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1380 obd->obd_name, exp->exp_client_uuid.uuid,
1381 exp->exp_connection == NULL ? "<unknown>" :
1382 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1383 print_export_data(exp, "EVICTING", 0);
1385 spin_unlock(&obd->obd_dev_lock);
1388 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1389 obd->obd_name, evicted);
1391 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1392 OBD_OPT_ABORT_RECOV);
1395 EXPORT_SYMBOL(class_disconnect_stale_exports);
1397 void class_fail_export(struct obd_export *exp)
1399 int rc, already_failed;
1401 spin_lock(&exp->exp_lock);
1402 already_failed = exp->exp_failed;
1403 exp->exp_failed = 1;
1404 spin_unlock(&exp->exp_lock);
1406 if (already_failed) {
1407 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1408 exp, exp->exp_client_uuid.uuid);
1412 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1413 exp, exp->exp_client_uuid.uuid);
1415 if (obd_dump_on_timeout)
1416 libcfs_debug_dumplog();
1418 /* need for safe call CDEBUG after obd_disconnect */
1419 class_export_get(exp);
1421 /* Most callers into obd_disconnect are removing their own reference
1422 * (request, for example) in addition to the one from the hash table.
1423 * We don't have such a reference here, so make one. */
1424 class_export_get(exp);
1425 rc = obd_disconnect(exp);
1427 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1429 CDEBUG(D_HA, "disconnected export %p/%s\n",
1430 exp, exp->exp_client_uuid.uuid);
1431 class_export_put(exp);
1433 EXPORT_SYMBOL(class_fail_export);
1435 char *obd_export_nid2str(struct obd_export *exp)
1437 if (exp->exp_connection != NULL)
1438 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1442 EXPORT_SYMBOL(obd_export_nid2str);
1444 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1446 cfs_hash_t *nid_hash;
1447 struct obd_export *doomed_exp = NULL;
1448 int exports_evicted = 0;
1450 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1452 spin_lock(&obd->obd_dev_lock);
1453 /* umount has run already, so evict thread should leave
1454 * its task to umount thread now */
1455 if (obd->obd_stopping) {
1456 spin_unlock(&obd->obd_dev_lock);
1457 return exports_evicted;
1459 nid_hash = obd->obd_nid_hash;
1460 cfs_hash_getref(nid_hash);
1461 spin_unlock(&obd->obd_dev_lock);
1464 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1465 if (doomed_exp == NULL)
1468 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1469 "nid %s found, wanted nid %s, requested nid %s\n",
1470 obd_export_nid2str(doomed_exp),
1471 libcfs_nid2str(nid_key), nid);
1472 LASSERTF(doomed_exp != obd->obd_self_export,
1473 "self-export is hashed by NID?\n");
1475 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1476 "request\n", obd->obd_name,
1477 obd_uuid2str(&doomed_exp->exp_client_uuid),
1478 obd_export_nid2str(doomed_exp));
1479 class_fail_export(doomed_exp);
1480 class_export_put(doomed_exp);
1483 cfs_hash_putref(nid_hash);
1485 if (!exports_evicted)
1486 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1487 obd->obd_name, nid);
1488 return exports_evicted;
1490 EXPORT_SYMBOL(obd_export_evict_by_nid);
1492 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1494 cfs_hash_t *uuid_hash;
1495 struct obd_export *doomed_exp = NULL;
1496 struct obd_uuid doomed_uuid;
1497 int exports_evicted = 0;
1499 spin_lock(&obd->obd_dev_lock);
1500 if (obd->obd_stopping) {
1501 spin_unlock(&obd->obd_dev_lock);
1502 return exports_evicted;
1504 uuid_hash = obd->obd_uuid_hash;
1505 cfs_hash_getref(uuid_hash);
1506 spin_unlock(&obd->obd_dev_lock);
1508 obd_str2uuid(&doomed_uuid, uuid);
1509 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1510 CERROR("%s: can't evict myself\n", obd->obd_name);
1511 cfs_hash_putref(uuid_hash);
1512 return exports_evicted;
1515 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1517 if (doomed_exp == NULL) {
1518 CERROR("%s: can't disconnect %s: no exports found\n",
1519 obd->obd_name, uuid);
1521 CWARN("%s: evicting %s at adminstrative request\n",
1522 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1523 class_fail_export(doomed_exp);
1524 class_export_put(doomed_exp);
1527 cfs_hash_putref(uuid_hash);
1529 return exports_evicted;
1531 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1533 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1534 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1535 EXPORT_SYMBOL(class_export_dump_hook);
1538 static void print_export_data(struct obd_export *exp, const char *status,
1541 struct ptlrpc_reply_state *rs;
1542 struct ptlrpc_reply_state *first_reply = NULL;
1545 spin_lock(&exp->exp_lock);
1546 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1552 spin_unlock(&exp->exp_lock);
1554 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1555 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1556 obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1557 atomic_read(&exp->exp_rpc_count),
1558 atomic_read(&exp->exp_cb_count),
1559 atomic_read(&exp->exp_locks_count),
1560 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1561 nreplies, first_reply, nreplies > 3 ? "..." : "",
1562 exp->exp_last_committed);
1563 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1564 if (locks && class_export_dump_hook != NULL)
1565 class_export_dump_hook(exp);
1569 void dump_exports(struct obd_device *obd, int locks)
1571 struct obd_export *exp;
1573 spin_lock(&obd->obd_dev_lock);
1574 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1575 print_export_data(exp, "ACTIVE", locks);
1576 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1577 print_export_data(exp, "UNLINKED", locks);
1578 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1579 print_export_data(exp, "DELAYED", locks);
1580 spin_unlock(&obd->obd_dev_lock);
1581 spin_lock(&obd_zombie_impexp_lock);
1582 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1583 print_export_data(exp, "ZOMBIE", locks);
1584 spin_unlock(&obd_zombie_impexp_lock);
1586 EXPORT_SYMBOL(dump_exports);
1588 void obd_exports_barrier(struct obd_device *obd)
1591 LASSERT(cfs_list_empty(&obd->obd_exports));
1592 spin_lock(&obd->obd_dev_lock);
1593 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1594 spin_unlock(&obd->obd_dev_lock);
1595 schedule_timeout_and_set_state(TASK_UNINTERRUPTIBLE,
1596 cfs_time_seconds(waited));
1597 if (waited > 5 && IS_PO2(waited)) {
1598 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1599 "more than %d seconds. "
1600 "The obd refcount = %d. Is it stuck?\n",
1601 obd->obd_name, waited,
1602 atomic_read(&obd->obd_refcount));
1603 dump_exports(obd, 1);
1606 spin_lock(&obd->obd_dev_lock);
1608 spin_unlock(&obd->obd_dev_lock);
1610 EXPORT_SYMBOL(obd_exports_barrier);
1612 /* Total amount of zombies to be destroyed */
1613 static int zombies_count = 0;
1616 * kill zombie imports and exports
1618 void obd_zombie_impexp_cull(void)
1620 struct obd_import *import;
1621 struct obd_export *export;
1625 spin_lock(&obd_zombie_impexp_lock);
1628 if (!cfs_list_empty(&obd_zombie_imports)) {
1629 import = cfs_list_entry(obd_zombie_imports.next,
1632 cfs_list_del_init(&import->imp_zombie_chain);
1636 if (!cfs_list_empty(&obd_zombie_exports)) {
1637 export = cfs_list_entry(obd_zombie_exports.next,
1640 cfs_list_del_init(&export->exp_obd_chain);
1643 spin_unlock(&obd_zombie_impexp_lock);
1645 if (import != NULL) {
1646 class_import_destroy(import);
1647 spin_lock(&obd_zombie_impexp_lock);
1649 spin_unlock(&obd_zombie_impexp_lock);
1652 if (export != NULL) {
1653 class_export_destroy(export);
1654 spin_lock(&obd_zombie_impexp_lock);
1656 spin_unlock(&obd_zombie_impexp_lock);
1660 } while (import != NULL || export != NULL);
1664 static struct completion obd_zombie_start;
1665 static struct completion obd_zombie_stop;
1666 static unsigned long obd_zombie_flags;
1667 static wait_queue_head_t obd_zombie_waitq;
1668 static pid_t obd_zombie_pid;
1671 OBD_ZOMBIE_STOP = 0x0001,
1675 * check for work for kill zombie import/export thread.
1677 static int obd_zombie_impexp_check(void *arg)
1681 spin_lock(&obd_zombie_impexp_lock);
1682 rc = (zombies_count == 0) &&
1683 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1684 spin_unlock(&obd_zombie_impexp_lock);
1690 * Add export to the obd_zombe thread and notify it.
1692 static void obd_zombie_export_add(struct obd_export *exp) {
1693 spin_lock(&exp->exp_obd->obd_dev_lock);
1694 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1695 cfs_list_del_init(&exp->exp_obd_chain);
1696 spin_unlock(&exp->exp_obd->obd_dev_lock);
1697 spin_lock(&obd_zombie_impexp_lock);
1699 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1700 spin_unlock(&obd_zombie_impexp_lock);
1702 obd_zombie_impexp_notify();
1706 * Add import to the obd_zombe thread and notify it.
1708 static void obd_zombie_import_add(struct obd_import *imp) {
1709 LASSERT(imp->imp_sec == NULL);
1710 LASSERT(imp->imp_rq_pool == NULL);
1711 spin_lock(&obd_zombie_impexp_lock);
1712 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1714 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1715 spin_unlock(&obd_zombie_impexp_lock);
1717 obd_zombie_impexp_notify();
1721 * notify import/export destroy thread about new zombie.
1723 static void obd_zombie_impexp_notify(void)
1726 * Make sure obd_zomebie_impexp_thread get this notification.
1727 * It is possible this signal only get by obd_zombie_barrier, and
1728 * barrier gulps this notification and sleeps away and hangs ensues
1730 wake_up_all(&obd_zombie_waitq);
1734 * check whether obd_zombie is idle
1736 static int obd_zombie_is_idle(void)
1740 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1741 spin_lock(&obd_zombie_impexp_lock);
1742 rc = (zombies_count == 0);
1743 spin_unlock(&obd_zombie_impexp_lock);
1748 * wait when obd_zombie import/export queues become empty
1750 void obd_zombie_barrier(void)
1752 struct l_wait_info lwi = { 0 };
1754 if (obd_zombie_pid == current_pid())
1755 /* don't wait for myself */
1757 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1759 EXPORT_SYMBOL(obd_zombie_barrier);
1764 * destroy zombie export/import thread.
1766 static int obd_zombie_impexp_thread(void *unused)
1768 unshare_fs_struct();
1769 complete(&obd_zombie_start);
1771 obd_zombie_pid = current_pid();
1773 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1774 struct l_wait_info lwi = { 0 };
1776 l_wait_event(obd_zombie_waitq,
1777 !obd_zombie_impexp_check(NULL), &lwi);
1778 obd_zombie_impexp_cull();
1781 * Notify obd_zombie_barrier callers that queues
1784 wake_up(&obd_zombie_waitq);
1787 complete(&obd_zombie_stop);
1792 #else /* ! KERNEL */
1794 static atomic_t zombie_recur = ATOMIC_INIT(0);
1795 static void *obd_zombie_impexp_work_cb;
1796 static void *obd_zombie_impexp_idle_cb;
1798 int obd_zombie_impexp_kill(void *arg)
1802 if (atomic_inc_return(&zombie_recur) == 1) {
1803 obd_zombie_impexp_cull();
1806 atomic_dec(&zombie_recur);
1813 * start destroy zombie import/export thread
1815 int obd_zombie_impexp_init(void)
1818 struct task_struct *task;
1821 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1822 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1823 spin_lock_init(&obd_zombie_impexp_lock);
1824 init_completion(&obd_zombie_start);
1825 init_completion(&obd_zombie_stop);
1826 init_waitqueue_head(&obd_zombie_waitq);
1830 task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1832 RETURN(PTR_ERR(task));
1834 wait_for_completion(&obd_zombie_start);
1837 obd_zombie_impexp_work_cb =
1838 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1839 &obd_zombie_impexp_kill, NULL);
1841 obd_zombie_impexp_idle_cb =
1842 liblustre_register_idle_callback("obd_zombi_impexp_check",
1843 &obd_zombie_impexp_check, NULL);
1848 * stop destroy zombie import/export thread
1850 void obd_zombie_impexp_stop(void)
1852 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1853 obd_zombie_impexp_notify();
1855 wait_for_completion(&obd_zombie_stop);
1857 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1858 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1862 /***** Kernel-userspace comm helpers *******/
1864 /* Get length of entire message, including header */
1865 int kuc_len(int payload_len)
1867 return sizeof(struct kuc_hdr) + payload_len;
1869 EXPORT_SYMBOL(kuc_len);
1871 /* Get a pointer to kuc header, given a ptr to the payload
1872 * @param p Pointer to payload area
1873 * @returns Pointer to kuc header
1875 struct kuc_hdr * kuc_ptr(void *p)
1877 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1878 LASSERT(lh->kuc_magic == KUC_MAGIC);
1881 EXPORT_SYMBOL(kuc_ptr);
1883 /* Test if payload is part of kuc message
1884 * @param p Pointer to payload area
1887 int kuc_ispayload(void *p)
1889 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1891 if (kh->kuc_magic == KUC_MAGIC)
1896 EXPORT_SYMBOL(kuc_ispayload);
1898 /* Alloc space for a message, and fill in header
1899 * @return Pointer to payload area
1901 void *kuc_alloc(int payload_len, int transport, int type)
1904 int len = kuc_len(payload_len);
1908 return ERR_PTR(-ENOMEM);
1910 lh->kuc_magic = KUC_MAGIC;
1911 lh->kuc_transport = transport;
1912 lh->kuc_msgtype = type;
1913 lh->kuc_msglen = len;
1915 return (void *)(lh + 1);
1917 EXPORT_SYMBOL(kuc_alloc);
1919 /* Takes pointer to payload area */
1920 inline void kuc_free(void *p, int payload_len)
1922 struct kuc_hdr *lh = kuc_ptr(p);
1923 OBD_FREE(lh, kuc_len(payload_len));
1925 EXPORT_SYMBOL(kuc_free);