4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2011, 2012, Whamcloud, Inc.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
50 extern cfs_list_t obd_types;
51 spinlock_t obd_types_lock;
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
58 cfs_list_t obd_zombie_imports;
59 cfs_list_t obd_zombie_exports;
60 spinlock_t obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65 const char *status, int locks);
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
71 * support functions: we could use inter-module communication, but this
72 * is more portable to other OS's
74 static struct obd_device *obd_device_alloc(void)
76 struct obd_device *obd;
78 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
80 obd->obd_magic = OBD_DEVICE_MAGIC;
85 static void obd_device_free(struct obd_device *obd)
88 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90 if (obd->obd_namespace != NULL) {
91 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92 obd, obd->obd_namespace, obd->obd_force);
95 lu_ref_fini(&obd->obd_reference);
96 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 struct obd_type *class_search_type(const char *name)
102 struct obd_type *type;
104 spin_lock(&obd_types_lock);
105 cfs_list_for_each(tmp, &obd_types) {
106 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107 if (strcmp(type->typ_name, name) == 0) {
108 spin_unlock(&obd_types_lock);
112 spin_unlock(&obd_types_lock);
115 EXPORT_SYMBOL(class_search_type);
117 struct obd_type *class_get_type(const char *name)
119 struct obd_type *type = class_search_type(name);
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
123 const char *modname = name;
125 if (strcmp(modname, "obdfilter") == 0)
128 if (!cfs_request_module("%s", modname)) {
129 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
130 type = class_search_type(name);
132 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
138 spin_lock(&type->obd_type_lock);
140 cfs_try_module_get(type->typ_dt_ops->o_owner);
141 spin_unlock(&type->obd_type_lock);
145 EXPORT_SYMBOL(class_get_type);
147 void class_put_type(struct obd_type *type)
150 spin_lock(&type->obd_type_lock);
152 cfs_module_put(type->typ_dt_ops->o_owner);
153 spin_unlock(&type->obd_type_lock);
155 EXPORT_SYMBOL(class_put_type);
157 #define CLASS_MAX_NAME 1024
159 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
160 struct lprocfs_vars *vars, const char *name,
161 struct lu_device_type *ldt)
163 struct obd_type *type;
168 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
170 if (class_search_type(name)) {
171 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
176 OBD_ALLOC(type, sizeof(*type));
180 OBD_ALLOC_PTR(type->typ_dt_ops);
181 OBD_ALLOC_PTR(type->typ_md_ops);
182 OBD_ALLOC(type->typ_name, strlen(name) + 1);
184 if (type->typ_dt_ops == NULL ||
185 type->typ_md_ops == NULL ||
186 type->typ_name == NULL)
189 *(type->typ_dt_ops) = *dt_ops;
190 /* md_ops is optional */
192 *(type->typ_md_ops) = *md_ops;
193 strcpy(type->typ_name, name);
194 spin_lock_init(&type->obd_type_lock);
197 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
199 if (IS_ERR(type->typ_procroot)) {
200 rc = PTR_ERR(type->typ_procroot);
201 type->typ_procroot = NULL;
207 rc = lu_device_type_init(ldt);
212 spin_lock(&obd_types_lock);
213 cfs_list_add(&type->typ_chain, &obd_types);
214 spin_unlock(&obd_types_lock);
219 if (type->typ_name != NULL)
220 OBD_FREE(type->typ_name, strlen(name) + 1);
221 if (type->typ_md_ops != NULL)
222 OBD_FREE_PTR(type->typ_md_ops);
223 if (type->typ_dt_ops != NULL)
224 OBD_FREE_PTR(type->typ_dt_ops);
225 OBD_FREE(type, sizeof(*type));
228 EXPORT_SYMBOL(class_register_type);
230 int class_unregister_type(const char *name)
232 struct obd_type *type = class_search_type(name);
236 CERROR("unknown obd type\n");
240 if (type->typ_refcnt) {
241 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
242 /* This is a bad situation, let's make the best of it */
243 /* Remove ops, but leave the name for debugging */
244 OBD_FREE_PTR(type->typ_dt_ops);
245 OBD_FREE_PTR(type->typ_md_ops);
249 /* we do not use type->typ_procroot as for compatibility purposes
250 * other modules can share names (i.e. lod can use lov entry). so
251 * we can't reference pointer as it can get invalided when another
252 * module removes the entry */
253 lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
256 lu_device_type_fini(type->typ_lu);
258 spin_lock(&obd_types_lock);
259 cfs_list_del(&type->typ_chain);
260 spin_unlock(&obd_types_lock);
261 OBD_FREE(type->typ_name, strlen(name) + 1);
262 if (type->typ_dt_ops != NULL)
263 OBD_FREE_PTR(type->typ_dt_ops);
264 if (type->typ_md_ops != NULL)
265 OBD_FREE_PTR(type->typ_md_ops);
266 OBD_FREE(type, sizeof(*type));
268 } /* class_unregister_type */
269 EXPORT_SYMBOL(class_unregister_type);
272 * Create a new obd device.
274 * Find an empty slot in ::obd_devs[], create a new obd device in it.
276 * \param[in] type_name obd device type string.
277 * \param[in] name obd device name.
279 * \retval NULL if create fails, otherwise return the obd device
282 struct obd_device *class_newdev(const char *type_name, const char *name)
284 struct obd_device *result = NULL;
285 struct obd_device *newdev;
286 struct obd_type *type = NULL;
288 int new_obd_minor = 0;
291 if (strlen(name) >= MAX_OBD_NAME) {
292 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
293 RETURN(ERR_PTR(-EINVAL));
296 type = class_get_type(type_name);
298 CERROR("OBD: unknown type: %s\n", type_name);
299 RETURN(ERR_PTR(-ENODEV));
302 newdev = obd_device_alloc();
304 GOTO(out_type, result = ERR_PTR(-ENOMEM));
306 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
308 write_lock(&obd_dev_lock);
309 for (i = 0; i < class_devno_max(); i++) {
310 struct obd_device *obd = class_num2obd(i);
312 if (obd && obd->obd_name &&
313 (strcmp(name, obd->obd_name) == 0)) {
314 CERROR("Device %s already exists at %d, won't add\n",
317 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
318 "%p obd_magic %08x != %08x\n", result,
319 result->obd_magic, OBD_DEVICE_MAGIC);
320 LASSERTF(result->obd_minor == new_obd_minor,
321 "%p obd_minor %d != %d\n", result,
322 result->obd_minor, new_obd_minor);
324 obd_devs[result->obd_minor] = NULL;
325 result->obd_name[0]='\0';
327 result = ERR_PTR(-EEXIST);
330 if (!result && !obd) {
332 result->obd_minor = i;
334 result->obd_type = type;
335 strncpy(result->obd_name, name,
336 sizeof(result->obd_name) - 1);
337 obd_devs[i] = result;
340 write_unlock(&obd_dev_lock);
342 if (result == NULL && i >= class_devno_max()) {
343 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
345 GOTO(out, result = ERR_PTR(-EOVERFLOW));
351 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
352 result->obd_name, result);
356 obd_device_free(newdev);
358 class_put_type(type);
362 void class_release_dev(struct obd_device *obd)
364 struct obd_type *obd_type = obd->obd_type;
366 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
367 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
368 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
369 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
370 LASSERT(obd_type != NULL);
372 CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
373 obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
375 write_lock(&obd_dev_lock);
376 obd_devs[obd->obd_minor] = NULL;
377 write_unlock(&obd_dev_lock);
378 obd_device_free(obd);
380 class_put_type(obd_type);
383 int class_name2dev(const char *name)
390 read_lock(&obd_dev_lock);
391 for (i = 0; i < class_devno_max(); i++) {
392 struct obd_device *obd = class_num2obd(i);
394 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
395 /* Make sure we finished attaching before we give
396 out any references */
397 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
398 if (obd->obd_attached) {
399 read_unlock(&obd_dev_lock);
405 read_unlock(&obd_dev_lock);
409 EXPORT_SYMBOL(class_name2dev);
411 struct obd_device *class_name2obd(const char *name)
413 int dev = class_name2dev(name);
415 if (dev < 0 || dev > class_devno_max())
417 return class_num2obd(dev);
419 EXPORT_SYMBOL(class_name2obd);
421 int class_uuid2dev(struct obd_uuid *uuid)
425 read_lock(&obd_dev_lock);
426 for (i = 0; i < class_devno_max(); i++) {
427 struct obd_device *obd = class_num2obd(i);
429 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
430 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
431 read_unlock(&obd_dev_lock);
435 read_unlock(&obd_dev_lock);
439 EXPORT_SYMBOL(class_uuid2dev);
441 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
443 int dev = class_uuid2dev(uuid);
446 return class_num2obd(dev);
448 EXPORT_SYMBOL(class_uuid2obd);
451 * Get obd device from ::obd_devs[]
453 * \param num [in] array index
455 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
456 * otherwise return the obd device there.
458 struct obd_device *class_num2obd(int num)
460 struct obd_device *obd = NULL;
462 if (num < class_devno_max()) {
467 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
468 "%p obd_magic %08x != %08x\n",
469 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
470 LASSERTF(obd->obd_minor == num,
471 "%p obd_minor %0d != %0d\n",
472 obd, obd->obd_minor, num);
477 EXPORT_SYMBOL(class_num2obd);
479 void class_obd_list(void)
484 read_lock(&obd_dev_lock);
485 for (i = 0; i < class_devno_max(); i++) {
486 struct obd_device *obd = class_num2obd(i);
490 if (obd->obd_stopping)
492 else if (obd->obd_set_up)
494 else if (obd->obd_attached)
498 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
499 i, status, obd->obd_type->typ_name,
500 obd->obd_name, obd->obd_uuid.uuid,
501 cfs_atomic_read(&obd->obd_refcount));
503 read_unlock(&obd_dev_lock);
507 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
508 specified, then only the client with that uuid is returned,
509 otherwise any client connected to the tgt is returned. */
510 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
511 const char * typ_name,
512 struct obd_uuid *grp_uuid)
516 read_lock(&obd_dev_lock);
517 for (i = 0; i < class_devno_max(); i++) {
518 struct obd_device *obd = class_num2obd(i);
522 if ((strncmp(obd->obd_type->typ_name, typ_name,
523 strlen(typ_name)) == 0)) {
524 if (obd_uuid_equals(tgt_uuid,
525 &obd->u.cli.cl_target_uuid) &&
526 ((grp_uuid)? obd_uuid_equals(grp_uuid,
527 &obd->obd_uuid) : 1)) {
528 read_unlock(&obd_dev_lock);
533 read_unlock(&obd_dev_lock);
537 EXPORT_SYMBOL(class_find_client_obd);
539 /* Iterate the obd_device list looking devices have grp_uuid. Start
540 searching at *next, and if a device is found, the next index to look
541 at is saved in *next. If next is NULL, then the first matching device
542 will always be returned. */
543 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
549 else if (*next >= 0 && *next < class_devno_max())
554 read_lock(&obd_dev_lock);
555 for (; i < class_devno_max(); i++) {
556 struct obd_device *obd = class_num2obd(i);
560 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
563 read_unlock(&obd_dev_lock);
567 read_unlock(&obd_dev_lock);
571 EXPORT_SYMBOL(class_devices_in_group);
574 * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
575 * adjust sptlrpc settings accordingly.
577 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
579 struct obd_device *obd;
583 LASSERT(namelen > 0);
585 read_lock(&obd_dev_lock);
586 for (i = 0; i < class_devno_max(); i++) {
587 obd = class_num2obd(i);
589 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
592 /* only notify mdc, osc, mdt, ost */
593 type = obd->obd_type->typ_name;
594 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
595 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
596 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
597 strcmp(type, LUSTRE_OST_NAME) != 0)
600 if (strncmp(obd->obd_name, fsname, namelen))
603 class_incref(obd, __FUNCTION__, obd);
604 read_unlock(&obd_dev_lock);
605 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
606 sizeof(KEY_SPTLRPC_CONF),
607 KEY_SPTLRPC_CONF, 0, NULL, NULL);
609 class_decref(obd, __FUNCTION__, obd);
610 read_lock(&obd_dev_lock);
612 read_unlock(&obd_dev_lock);
615 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
617 void obd_cleanup_caches(void)
622 if (obd_device_cachep) {
623 rc = cfs_mem_cache_destroy(obd_device_cachep);
624 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
625 obd_device_cachep = NULL;
628 rc = cfs_mem_cache_destroy(obdo_cachep);
629 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
633 rc = cfs_mem_cache_destroy(import_cachep);
634 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
635 import_cachep = NULL;
638 rc = cfs_mem_cache_destroy(capa_cachep);
639 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
645 int obd_init_caches(void)
649 LASSERT(obd_device_cachep == NULL);
650 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
651 sizeof(struct obd_device),
653 if (!obd_device_cachep)
656 LASSERT(obdo_cachep == NULL);
657 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
662 LASSERT(import_cachep == NULL);
663 import_cachep = cfs_mem_cache_create("ll_import_cache",
664 sizeof(struct obd_import),
669 LASSERT(capa_cachep == NULL);
670 capa_cachep = cfs_mem_cache_create("capa_cache",
671 sizeof(struct obd_capa), 0, 0);
677 obd_cleanup_caches();
682 /* map connection to client */
683 struct obd_export *class_conn2export(struct lustre_handle *conn)
685 struct obd_export *export;
689 CDEBUG(D_CACHE, "looking for null handle\n");
693 if (conn->cookie == -1) { /* this means assign a new connection */
694 CDEBUG(D_CACHE, "want a new connection\n");
698 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
699 export = class_handle2object(conn->cookie);
702 EXPORT_SYMBOL(class_conn2export);
704 struct obd_device *class_exp2obd(struct obd_export *exp)
710 EXPORT_SYMBOL(class_exp2obd);
712 struct obd_device *class_conn2obd(struct lustre_handle *conn)
714 struct obd_export *export;
715 export = class_conn2export(conn);
717 struct obd_device *obd = export->exp_obd;
718 class_export_put(export);
723 EXPORT_SYMBOL(class_conn2obd);
725 struct obd_import *class_exp2cliimp(struct obd_export *exp)
727 struct obd_device *obd = exp->exp_obd;
730 return obd->u.cli.cl_import;
732 EXPORT_SYMBOL(class_exp2cliimp);
734 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
736 struct obd_device *obd = class_conn2obd(conn);
739 return obd->u.cli.cl_import;
741 EXPORT_SYMBOL(class_conn2cliimp);
743 /* Export management functions */
744 static void class_export_destroy(struct obd_export *exp)
746 struct obd_device *obd = exp->exp_obd;
749 LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
750 LASSERT(obd != NULL);
752 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
753 exp->exp_client_uuid.uuid, obd->obd_name);
755 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
756 if (exp->exp_connection)
757 ptlrpc_put_connection_superhack(exp->exp_connection);
759 LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
760 LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
761 LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
762 LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
763 obd_destroy_export(exp);
764 class_decref(obd, "export", exp);
766 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
770 static void export_handle_addref(void *export)
772 class_export_get(export);
775 static struct portals_handle_ops export_handle_ops = {
776 .hop_addref = export_handle_addref,
780 struct obd_export *class_export_get(struct obd_export *exp)
782 cfs_atomic_inc(&exp->exp_refcount);
783 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
784 cfs_atomic_read(&exp->exp_refcount));
787 EXPORT_SYMBOL(class_export_get);
789 void class_export_put(struct obd_export *exp)
791 LASSERT(exp != NULL);
792 LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
793 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
794 cfs_atomic_read(&exp->exp_refcount) - 1);
796 if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
797 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
798 CDEBUG(D_IOCTL, "final put %p/%s\n",
799 exp, exp->exp_client_uuid.uuid);
801 /* release nid stat refererence */
802 lprocfs_exp_cleanup(exp);
804 obd_zombie_export_add(exp);
807 EXPORT_SYMBOL(class_export_put);
809 /* Creates a new export, adds it to the hash table, and returns a
810 * pointer to it. The refcount is 2: one for the hash reference, and
811 * one for the pointer returned by this function. */
812 struct obd_export *class_new_export(struct obd_device *obd,
813 struct obd_uuid *cluuid)
815 struct obd_export *export;
816 cfs_hash_t *hash = NULL;
820 OBD_ALLOC_PTR(export);
822 return ERR_PTR(-ENOMEM);
824 export->exp_conn_cnt = 0;
825 export->exp_lock_hash = NULL;
826 export->exp_flock_hash = NULL;
827 cfs_atomic_set(&export->exp_refcount, 2);
828 cfs_atomic_set(&export->exp_rpc_count, 0);
829 cfs_atomic_set(&export->exp_cb_count, 0);
830 cfs_atomic_set(&export->exp_locks_count, 0);
831 #if LUSTRE_TRACKS_LOCK_EXP_REFS
832 CFS_INIT_LIST_HEAD(&export->exp_locks_list);
833 spin_lock_init(&export->exp_locks_list_guard);
835 cfs_atomic_set(&export->exp_replay_count, 0);
836 export->exp_obd = obd;
837 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
838 spin_lock_init(&export->exp_uncommitted_replies_lock);
839 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
840 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
841 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
842 CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
843 class_handle_hash(&export->exp_handle, &export_handle_ops);
844 export->exp_last_request_time = cfs_time_current_sec();
845 spin_lock_init(&export->exp_lock);
846 spin_lock_init(&export->exp_rpc_lock);
847 CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
848 CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
849 spin_lock_init(&export->exp_bl_list_lock);
850 CFS_INIT_LIST_HEAD(&export->exp_bl_list);
852 export->exp_sp_peer = LUSTRE_SP_ANY;
853 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
854 export->exp_client_uuid = *cluuid;
855 obd_init_export(export);
857 spin_lock(&obd->obd_dev_lock);
858 /* shouldn't happen, but might race */
859 if (obd->obd_stopping)
860 GOTO(exit_unlock, rc = -ENODEV);
862 hash = cfs_hash_getref(obd->obd_uuid_hash);
864 GOTO(exit_unlock, rc = -ENODEV);
865 spin_unlock(&obd->obd_dev_lock);
867 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
868 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
870 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
871 obd->obd_name, cluuid->uuid, rc);
872 GOTO(exit_err, rc = -EALREADY);
876 spin_lock(&obd->obd_dev_lock);
877 if (obd->obd_stopping) {
878 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
879 GOTO(exit_unlock, rc = -ENODEV);
882 class_incref(obd, "export", export);
883 cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
884 cfs_list_add_tail(&export->exp_obd_chain_timed,
885 &export->exp_obd->obd_exports_timed);
886 export->exp_obd->obd_num_exports++;
887 spin_unlock(&obd->obd_dev_lock);
888 cfs_hash_putref(hash);
892 spin_unlock(&obd->obd_dev_lock);
895 cfs_hash_putref(hash);
896 class_handle_unhash(&export->exp_handle);
897 LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
898 obd_destroy_export(export);
899 OBD_FREE_PTR(export);
902 EXPORT_SYMBOL(class_new_export);
904 void class_unlink_export(struct obd_export *exp)
906 class_handle_unhash(&exp->exp_handle);
908 spin_lock(&exp->exp_obd->obd_dev_lock);
909 /* delete an uuid-export hashitem from hashtables */
910 if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
911 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
912 &exp->exp_client_uuid,
913 &exp->exp_uuid_hash);
915 cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
916 cfs_list_del_init(&exp->exp_obd_chain_timed);
917 exp->exp_obd->obd_num_exports--;
918 spin_unlock(&exp->exp_obd->obd_dev_lock);
919 class_export_put(exp);
921 EXPORT_SYMBOL(class_unlink_export);
923 /* Import management functions */
924 void class_import_destroy(struct obd_import *imp)
928 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
929 imp->imp_obd->obd_name);
931 LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
933 ptlrpc_put_connection_superhack(imp->imp_connection);
935 while (!cfs_list_empty(&imp->imp_conn_list)) {
936 struct obd_import_conn *imp_conn;
938 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
939 struct obd_import_conn, oic_item);
940 cfs_list_del_init(&imp_conn->oic_item);
941 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
942 OBD_FREE(imp_conn, sizeof(*imp_conn));
945 LASSERT(imp->imp_sec == NULL);
946 class_decref(imp->imp_obd, "import", imp);
947 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
951 static void import_handle_addref(void *import)
953 class_import_get(import);
956 static struct portals_handle_ops import_handle_ops = {
957 .hop_addref = import_handle_addref,
961 struct obd_import *class_import_get(struct obd_import *import)
963 cfs_atomic_inc(&import->imp_refcount);
964 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
965 cfs_atomic_read(&import->imp_refcount),
966 import->imp_obd->obd_name);
969 EXPORT_SYMBOL(class_import_get);
971 void class_import_put(struct obd_import *imp)
975 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
976 LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
978 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
979 cfs_atomic_read(&imp->imp_refcount) - 1,
980 imp->imp_obd->obd_name);
982 if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
983 CDEBUG(D_INFO, "final put import %p\n", imp);
984 obd_zombie_import_add(imp);
987 /* catch possible import put race */
988 LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
991 EXPORT_SYMBOL(class_import_put);
993 static void init_imp_at(struct imp_at *at) {
995 at_init(&at->iat_net_latency, 0, 0);
996 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
997 /* max service estimates are tracked on the server side, so
998 don't use the AT history here, just use the last reported
999 val. (But keep hist for proc histogram, worst_ever) */
1000 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1005 struct obd_import *class_new_import(struct obd_device *obd)
1007 struct obd_import *imp;
1009 OBD_ALLOC(imp, sizeof(*imp));
1013 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1014 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1015 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1016 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1017 spin_lock_init(&imp->imp_lock);
1018 imp->imp_last_success_conn = 0;
1019 imp->imp_state = LUSTRE_IMP_NEW;
1020 imp->imp_obd = class_incref(obd, "import", imp);
1021 mutex_init(&imp->imp_sec_mutex);
1022 cfs_waitq_init(&imp->imp_recovery_waitq);
1024 cfs_atomic_set(&imp->imp_refcount, 2);
1025 cfs_atomic_set(&imp->imp_unregistering, 0);
1026 cfs_atomic_set(&imp->imp_inflight, 0);
1027 cfs_atomic_set(&imp->imp_replay_inflight, 0);
1028 cfs_atomic_set(&imp->imp_inval_count, 0);
1029 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1030 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1031 class_handle_hash(&imp->imp_handle, &import_handle_ops);
1032 init_imp_at(&imp->imp_at);
1034 /* the default magic is V2, will be used in connect RPC, and
1035 * then adjusted according to the flags in request/reply. */
1036 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1040 EXPORT_SYMBOL(class_new_import);
1042 void class_destroy_import(struct obd_import *import)
1044 LASSERT(import != NULL);
1045 LASSERT(import != LP_POISON);
1047 class_handle_unhash(&import->imp_handle);
1049 spin_lock(&import->imp_lock);
1050 import->imp_generation++;
1051 spin_unlock(&import->imp_lock);
1052 class_import_put(import);
1054 EXPORT_SYMBOL(class_destroy_import);
1056 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1058 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1060 spin_lock(&exp->exp_locks_list_guard);
1062 LASSERT(lock->l_exp_refs_nr >= 0);
1064 if (lock->l_exp_refs_target != NULL &&
1065 lock->l_exp_refs_target != exp) {
1066 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1067 exp, lock, lock->l_exp_refs_target);
1069 if ((lock->l_exp_refs_nr ++) == 0) {
1070 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1071 lock->l_exp_refs_target = exp;
1073 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1074 lock, exp, lock->l_exp_refs_nr);
1075 spin_unlock(&exp->exp_locks_list_guard);
1077 EXPORT_SYMBOL(__class_export_add_lock_ref);
1079 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1081 spin_lock(&exp->exp_locks_list_guard);
1082 LASSERT(lock->l_exp_refs_nr > 0);
1083 if (lock->l_exp_refs_target != exp) {
1084 LCONSOLE_WARN("lock %p, "
1085 "mismatching export pointers: %p, %p\n",
1086 lock, lock->l_exp_refs_target, exp);
1088 if (-- lock->l_exp_refs_nr == 0) {
1089 cfs_list_del_init(&lock->l_exp_refs_link);
1090 lock->l_exp_refs_target = NULL;
1092 CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1093 lock, exp, lock->l_exp_refs_nr);
1094 spin_unlock(&exp->exp_locks_list_guard);
1096 EXPORT_SYMBOL(__class_export_del_lock_ref);
1099 /* A connection defines an export context in which preallocation can
1100 be managed. This releases the export pointer reference, and returns
1101 the export handle, so the export refcount is 1 when this function
1103 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1104 struct obd_uuid *cluuid)
1106 struct obd_export *export;
1107 LASSERT(conn != NULL);
1108 LASSERT(obd != NULL);
1109 LASSERT(cluuid != NULL);
1112 export = class_new_export(obd, cluuid);
1114 RETURN(PTR_ERR(export));
1116 conn->cookie = export->exp_handle.h_cookie;
1117 class_export_put(export);
1119 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1120 cluuid->uuid, conn->cookie);
1123 EXPORT_SYMBOL(class_connect);
1125 /* if export is involved in recovery then clean up related things */
1126 void class_export_recovery_cleanup(struct obd_export *exp)
1128 struct obd_device *obd = exp->exp_obd;
1130 spin_lock(&obd->obd_recovery_task_lock);
1131 if (exp->exp_delayed)
1132 obd->obd_delayed_clients--;
1133 if (obd->obd_recovering && exp->exp_in_recovery) {
1134 spin_lock(&exp->exp_lock);
1135 exp->exp_in_recovery = 0;
1136 spin_unlock(&exp->exp_lock);
1137 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1138 cfs_atomic_dec(&obd->obd_connected_clients);
1140 spin_unlock(&obd->obd_recovery_task_lock);
1141 /** Cleanup req replay fields */
1142 if (exp->exp_req_replay_needed) {
1143 spin_lock(&exp->exp_lock);
1144 exp->exp_req_replay_needed = 0;
1145 spin_unlock(&exp->exp_lock);
1146 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1147 cfs_atomic_dec(&obd->obd_req_replay_clients);
1149 /** Cleanup lock replay data */
1150 if (exp->exp_lock_replay_needed) {
1151 spin_lock(&exp->exp_lock);
1152 exp->exp_lock_replay_needed = 0;
1153 spin_unlock(&exp->exp_lock);
1154 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1155 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1159 /* This function removes 1-3 references from the export:
1160 * 1 - for export pointer passed
1161 * and if disconnect really need
1162 * 2 - removing from hash
1163 * 3 - in client_unlink_export
1164 * The export pointer passed to this function can destroyed */
1165 int class_disconnect(struct obd_export *export)
1167 int already_disconnected;
1170 if (export == NULL) {
1171 CWARN("attempting to free NULL export %p\n", export);
1175 spin_lock(&export->exp_lock);
1176 already_disconnected = export->exp_disconnected;
1177 export->exp_disconnected = 1;
1178 spin_unlock(&export->exp_lock);
1180 /* class_cleanup(), abort_recovery(), and class_fail_export()
1181 * all end up in here, and if any of them race we shouldn't
1182 * call extra class_export_puts(). */
1183 if (already_disconnected) {
1184 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1185 GOTO(no_disconn, already_disconnected);
1188 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1189 export->exp_handle.h_cookie);
1191 if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1192 cfs_hash_del(export->exp_obd->obd_nid_hash,
1193 &export->exp_connection->c_peer.nid,
1194 &export->exp_nid_hash);
1196 class_export_recovery_cleanup(export);
1197 class_unlink_export(export);
1199 class_export_put(export);
1202 EXPORT_SYMBOL(class_disconnect);
1204 /* Return non-zero for a fully connected export */
1205 int class_connected_export(struct obd_export *exp)
1209 spin_lock(&exp->exp_lock);
1210 connected = (exp->exp_conn_cnt > 0);
1211 spin_unlock(&exp->exp_lock);
1216 EXPORT_SYMBOL(class_connected_export);
1218 static void class_disconnect_export_list(cfs_list_t *list,
1219 enum obd_option flags)
1222 struct obd_export *exp;
1225 /* It's possible that an export may disconnect itself, but
1226 * nothing else will be added to this list. */
1227 while (!cfs_list_empty(list)) {
1228 exp = cfs_list_entry(list->next, struct obd_export,
1230 /* need for safe call CDEBUG after obd_disconnect */
1231 class_export_get(exp);
1233 spin_lock(&exp->exp_lock);
1234 exp->exp_flags = flags;
1235 spin_unlock(&exp->exp_lock);
1237 if (obd_uuid_equals(&exp->exp_client_uuid,
1238 &exp->exp_obd->obd_uuid)) {
1240 "exp %p export uuid == obd uuid, don't discon\n",
1242 /* Need to delete this now so we don't end up pointing
1243 * to work_list later when this export is cleaned up. */
1244 cfs_list_del_init(&exp->exp_obd_chain);
1245 class_export_put(exp);
1249 class_export_get(exp);
1250 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1251 "last request at "CFS_TIME_T"\n",
1252 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1253 exp, exp->exp_last_request_time);
1254 /* release one export reference anyway */
1255 rc = obd_disconnect(exp);
1257 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1258 obd_export_nid2str(exp), exp, rc);
1259 class_export_put(exp);
1264 void class_disconnect_exports(struct obd_device *obd)
1266 cfs_list_t work_list;
1269 /* Move all of the exports from obd_exports to a work list, en masse. */
1270 CFS_INIT_LIST_HEAD(&work_list);
1271 spin_lock(&obd->obd_dev_lock);
1272 cfs_list_splice_init(&obd->obd_exports, &work_list);
1273 cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1274 spin_unlock(&obd->obd_dev_lock);
1276 if (!cfs_list_empty(&work_list)) {
1277 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1278 "disconnecting them\n", obd->obd_minor, obd);
1279 class_disconnect_export_list(&work_list,
1280 exp_flags_from_obd(obd));
1282 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1283 obd->obd_minor, obd);
1286 EXPORT_SYMBOL(class_disconnect_exports);
1288 /* Remove exports that have not completed recovery.
1290 void class_disconnect_stale_exports(struct obd_device *obd,
1291 int (*test_export)(struct obd_export *))
1293 cfs_list_t work_list;
1294 struct obd_export *exp, *n;
1298 CFS_INIT_LIST_HEAD(&work_list);
1299 spin_lock(&obd->obd_dev_lock);
1300 cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1302 /* don't count self-export as client */
1303 if (obd_uuid_equals(&exp->exp_client_uuid,
1304 &exp->exp_obd->obd_uuid))
1307 /* don't evict clients which have no slot in last_rcvd
1308 * (e.g. lightweight connection) */
1309 if (exp->exp_target_data.ted_lr_idx == -1)
1312 spin_lock(&exp->exp_lock);
1313 if (test_export(exp)) {
1314 spin_unlock(&exp->exp_lock);
1317 exp->exp_failed = 1;
1318 spin_unlock(&exp->exp_lock);
1320 cfs_list_move(&exp->exp_obd_chain, &work_list);
1322 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1323 obd->obd_name, exp->exp_client_uuid.uuid,
1324 exp->exp_connection == NULL ? "<unknown>" :
1325 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1326 print_export_data(exp, "EVICTING", 0);
1328 spin_unlock(&obd->obd_dev_lock);
1331 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1332 obd->obd_name, evicted);
1333 obd->obd_stale_clients += evicted;
1335 class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1336 OBD_OPT_ABORT_RECOV);
1339 EXPORT_SYMBOL(class_disconnect_stale_exports);
1341 void class_fail_export(struct obd_export *exp)
1343 int rc, already_failed;
1345 spin_lock(&exp->exp_lock);
1346 already_failed = exp->exp_failed;
1347 exp->exp_failed = 1;
1348 spin_unlock(&exp->exp_lock);
1350 if (already_failed) {
1351 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1352 exp, exp->exp_client_uuid.uuid);
1356 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1357 exp, exp->exp_client_uuid.uuid);
1359 if (obd_dump_on_timeout)
1360 libcfs_debug_dumplog();
1362 /* need for safe call CDEBUG after obd_disconnect */
1363 class_export_get(exp);
1365 /* Most callers into obd_disconnect are removing their own reference
1366 * (request, for example) in addition to the one from the hash table.
1367 * We don't have such a reference here, so make one. */
1368 class_export_get(exp);
1369 rc = obd_disconnect(exp);
1371 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1373 CDEBUG(D_HA, "disconnected export %p/%s\n",
1374 exp, exp->exp_client_uuid.uuid);
1375 class_export_put(exp);
1377 EXPORT_SYMBOL(class_fail_export);
1379 char *obd_export_nid2str(struct obd_export *exp)
1381 if (exp->exp_connection != NULL)
1382 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1386 EXPORT_SYMBOL(obd_export_nid2str);
1388 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1390 cfs_hash_t *nid_hash;
1391 struct obd_export *doomed_exp = NULL;
1392 int exports_evicted = 0;
1394 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1396 spin_lock(&obd->obd_dev_lock);
1397 /* umount has run already, so evict thread should leave
1398 * its task to umount thread now */
1399 if (obd->obd_stopping) {
1400 spin_unlock(&obd->obd_dev_lock);
1401 return exports_evicted;
1403 nid_hash = obd->obd_nid_hash;
1404 cfs_hash_getref(nid_hash);
1405 spin_unlock(&obd->obd_dev_lock);
1408 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1409 if (doomed_exp == NULL)
1412 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1413 "nid %s found, wanted nid %s, requested nid %s\n",
1414 obd_export_nid2str(doomed_exp),
1415 libcfs_nid2str(nid_key), nid);
1416 LASSERTF(doomed_exp != obd->obd_self_export,
1417 "self-export is hashed by NID?\n");
1419 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1420 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1422 class_fail_export(doomed_exp);
1423 class_export_put(doomed_exp);
1426 cfs_hash_putref(nid_hash);
1428 if (!exports_evicted)
1429 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1430 obd->obd_name, nid);
1431 return exports_evicted;
1433 EXPORT_SYMBOL(obd_export_evict_by_nid);
1435 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1437 cfs_hash_t *uuid_hash;
1438 struct obd_export *doomed_exp = NULL;
1439 struct obd_uuid doomed_uuid;
1440 int exports_evicted = 0;
1442 spin_lock(&obd->obd_dev_lock);
1443 if (obd->obd_stopping) {
1444 spin_unlock(&obd->obd_dev_lock);
1445 return exports_evicted;
1447 uuid_hash = obd->obd_uuid_hash;
1448 cfs_hash_getref(uuid_hash);
1449 spin_unlock(&obd->obd_dev_lock);
1451 obd_str2uuid(&doomed_uuid, uuid);
1452 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1453 CERROR("%s: can't evict myself\n", obd->obd_name);
1454 cfs_hash_putref(uuid_hash);
1455 return exports_evicted;
1458 doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1460 if (doomed_exp == NULL) {
1461 CERROR("%s: can't disconnect %s: no exports found\n",
1462 obd->obd_name, uuid);
1464 CWARN("%s: evicting %s at adminstrative request\n",
1465 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1466 class_fail_export(doomed_exp);
1467 class_export_put(doomed_exp);
1470 cfs_hash_putref(uuid_hash);
1472 return exports_evicted;
1474 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1476 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1477 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1478 EXPORT_SYMBOL(class_export_dump_hook);
1481 static void print_export_data(struct obd_export *exp, const char *status,
1484 struct ptlrpc_reply_state *rs;
1485 struct ptlrpc_reply_state *first_reply = NULL;
1488 spin_lock(&exp->exp_lock);
1489 cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1495 spin_unlock(&exp->exp_lock);
1497 CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1498 exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1499 obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1500 cfs_atomic_read(&exp->exp_rpc_count),
1501 cfs_atomic_read(&exp->exp_cb_count),
1502 cfs_atomic_read(&exp->exp_locks_count),
1503 exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1504 nreplies, first_reply, nreplies > 3 ? "..." : "",
1505 exp->exp_last_committed);
1506 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1507 if (locks && class_export_dump_hook != NULL)
1508 class_export_dump_hook(exp);
1512 void dump_exports(struct obd_device *obd, int locks)
1514 struct obd_export *exp;
1516 spin_lock(&obd->obd_dev_lock);
1517 cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1518 print_export_data(exp, "ACTIVE", locks);
1519 cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1520 print_export_data(exp, "UNLINKED", locks);
1521 cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1522 print_export_data(exp, "DELAYED", locks);
1523 spin_unlock(&obd->obd_dev_lock);
1524 spin_lock(&obd_zombie_impexp_lock);
1525 cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1526 print_export_data(exp, "ZOMBIE", locks);
1527 spin_unlock(&obd_zombie_impexp_lock);
1529 EXPORT_SYMBOL(dump_exports);
1531 void obd_exports_barrier(struct obd_device *obd)
1534 LASSERT(cfs_list_empty(&obd->obd_exports));
1535 spin_lock(&obd->obd_dev_lock);
1536 while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1537 spin_unlock(&obd->obd_dev_lock);
1538 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1539 cfs_time_seconds(waited));
1540 if (waited > 5 && IS_PO2(waited)) {
1541 LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1542 "more than %d seconds. "
1543 "The obd refcount = %d. Is it stuck?\n",
1544 obd->obd_name, waited,
1545 cfs_atomic_read(&obd->obd_refcount));
1546 dump_exports(obd, 1);
1549 spin_lock(&obd->obd_dev_lock);
1551 spin_unlock(&obd->obd_dev_lock);
1553 EXPORT_SYMBOL(obd_exports_barrier);
1555 /* Total amount of zombies to be destroyed */
1556 static int zombies_count = 0;
1559 * kill zombie imports and exports
1561 void obd_zombie_impexp_cull(void)
1563 struct obd_import *import;
1564 struct obd_export *export;
1568 spin_lock(&obd_zombie_impexp_lock);
1571 if (!cfs_list_empty(&obd_zombie_imports)) {
1572 import = cfs_list_entry(obd_zombie_imports.next,
1575 cfs_list_del_init(&import->imp_zombie_chain);
1579 if (!cfs_list_empty(&obd_zombie_exports)) {
1580 export = cfs_list_entry(obd_zombie_exports.next,
1583 cfs_list_del_init(&export->exp_obd_chain);
1586 spin_unlock(&obd_zombie_impexp_lock);
1588 if (import != NULL) {
1589 class_import_destroy(import);
1590 spin_lock(&obd_zombie_impexp_lock);
1592 spin_unlock(&obd_zombie_impexp_lock);
1595 if (export != NULL) {
1596 class_export_destroy(export);
1597 spin_lock(&obd_zombie_impexp_lock);
1599 spin_unlock(&obd_zombie_impexp_lock);
1603 } while (import != NULL || export != NULL);
1607 static struct completion obd_zombie_start;
1608 static struct completion obd_zombie_stop;
1609 static unsigned long obd_zombie_flags;
1610 static cfs_waitq_t obd_zombie_waitq;
1611 static pid_t obd_zombie_pid;
1614 OBD_ZOMBIE_STOP = 0x0001,
1618 * check for work for kill zombie import/export thread.
1620 static int obd_zombie_impexp_check(void *arg)
1624 spin_lock(&obd_zombie_impexp_lock);
1625 rc = (zombies_count == 0) &&
1626 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1627 spin_unlock(&obd_zombie_impexp_lock);
1633 * Add export to the obd_zombe thread and notify it.
1635 static void obd_zombie_export_add(struct obd_export *exp) {
1636 spin_lock(&exp->exp_obd->obd_dev_lock);
1637 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1638 cfs_list_del_init(&exp->exp_obd_chain);
1639 spin_unlock(&exp->exp_obd->obd_dev_lock);
1640 spin_lock(&obd_zombie_impexp_lock);
1642 cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1643 spin_unlock(&obd_zombie_impexp_lock);
1645 obd_zombie_impexp_notify();
1649 * Add import to the obd_zombe thread and notify it.
1651 static void obd_zombie_import_add(struct obd_import *imp) {
1652 LASSERT(imp->imp_sec == NULL);
1653 LASSERT(imp->imp_rq_pool == NULL);
1654 spin_lock(&obd_zombie_impexp_lock);
1655 LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1657 cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1658 spin_unlock(&obd_zombie_impexp_lock);
1660 obd_zombie_impexp_notify();
1664 * notify import/export destroy thread about new zombie.
1666 static void obd_zombie_impexp_notify(void)
1669 * Make sure obd_zomebie_impexp_thread get this notification.
1670 * It is possible this signal only get by obd_zombie_barrier, and
1671 * barrier gulps this notification and sleeps away and hangs ensues
1673 cfs_waitq_broadcast(&obd_zombie_waitq);
1677 * check whether obd_zombie is idle
1679 static int obd_zombie_is_idle(void)
1683 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1684 spin_lock(&obd_zombie_impexp_lock);
1685 rc = (zombies_count == 0);
1686 spin_unlock(&obd_zombie_impexp_lock);
1691 * wait when obd_zombie import/export queues become empty
1693 void obd_zombie_barrier(void)
1695 struct l_wait_info lwi = { 0 };
1697 if (obd_zombie_pid == cfs_curproc_pid())
1698 /* don't wait for myself */
1700 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1702 EXPORT_SYMBOL(obd_zombie_barrier);
1707 * destroy zombie export/import thread.
1709 static int obd_zombie_impexp_thread(void *unused)
1713 rc = cfs_daemonize_ctxt("obd_zombid");
1715 complete(&obd_zombie_start);
1719 complete(&obd_zombie_start);
1721 obd_zombie_pid = cfs_curproc_pid();
1723 while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1724 struct l_wait_info lwi = { 0 };
1726 l_wait_event(obd_zombie_waitq,
1727 !obd_zombie_impexp_check(NULL), &lwi);
1728 obd_zombie_impexp_cull();
1731 * Notify obd_zombie_barrier callers that queues
1734 cfs_waitq_signal(&obd_zombie_waitq);
1737 complete(&obd_zombie_stop);
1742 #else /* ! KERNEL */
1744 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1745 static void *obd_zombie_impexp_work_cb;
1746 static void *obd_zombie_impexp_idle_cb;
1748 int obd_zombie_impexp_kill(void *arg)
1752 if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1753 obd_zombie_impexp_cull();
1756 cfs_atomic_dec(&zombie_recur);
1763 * start destroy zombie import/export thread
1765 int obd_zombie_impexp_init(void)
1769 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1770 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1771 spin_lock_init(&obd_zombie_impexp_lock);
1772 init_completion(&obd_zombie_start);
1773 init_completion(&obd_zombie_stop);
1774 cfs_waitq_init(&obd_zombie_waitq);
1778 rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1782 wait_for_completion(&obd_zombie_start);
1785 obd_zombie_impexp_work_cb =
1786 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1787 &obd_zombie_impexp_kill, NULL);
1789 obd_zombie_impexp_idle_cb =
1790 liblustre_register_idle_callback("obd_zombi_impexp_check",
1791 &obd_zombie_impexp_check, NULL);
1797 * stop destroy zombie import/export thread
1799 void obd_zombie_impexp_stop(void)
1801 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1802 obd_zombie_impexp_notify();
1804 wait_for_completion(&obd_zombie_stop);
1806 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1807 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1811 /***** Kernel-userspace comm helpers *******/
1813 /* Get length of entire message, including header */
1814 int kuc_len(int payload_len)
1816 return sizeof(struct kuc_hdr) + payload_len;
1818 EXPORT_SYMBOL(kuc_len);
1820 /* Get a pointer to kuc header, given a ptr to the payload
1821 * @param p Pointer to payload area
1822 * @returns Pointer to kuc header
1824 struct kuc_hdr * kuc_ptr(void *p)
1826 struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1827 LASSERT(lh->kuc_magic == KUC_MAGIC);
1830 EXPORT_SYMBOL(kuc_ptr);
1832 /* Test if payload is part of kuc message
1833 * @param p Pointer to payload area
1836 int kuc_ispayload(void *p)
1838 struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1840 if (kh->kuc_magic == KUC_MAGIC)
1845 EXPORT_SYMBOL(kuc_ispayload);
1847 /* Alloc space for a message, and fill in header
1848 * @return Pointer to payload area
1850 void *kuc_alloc(int payload_len, int transport, int type)
1853 int len = kuc_len(payload_len);
1857 return ERR_PTR(-ENOMEM);
1859 lh->kuc_magic = KUC_MAGIC;
1860 lh->kuc_transport = transport;
1861 lh->kuc_msgtype = type;
1862 lh->kuc_msglen = len;
1864 return (void *)(lh + 1);
1866 EXPORT_SYMBOL(kuc_alloc);
1868 /* Takes pointer to payload area */
1869 inline void kuc_free(void *p, int payload_len)
1871 struct kuc_hdr *lh = kuc_ptr(p);
1872 OBD_FREE(lh, kuc_len(payload_len));
1874 EXPORT_SYMBOL(kuc_free);