1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
66 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
69 * support functions: we could use inter-module communication, but this
70 * is more portable to other OS's
72 static struct obd_device *obd_device_alloc(void)
74 struct obd_device *obd;
76 OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
78 obd->obd_magic = OBD_DEVICE_MAGIC;
82 EXPORT_SYMBOL(obd_device_alloc);
84 static void obd_device_free(struct obd_device *obd)
87 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89 if (obd->obd_namespace != NULL) {
90 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91 obd, obd->obd_namespace, obd->obd_force);
94 lu_ref_fini(&obd->obd_reference);
95 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 struct obd_type *class_search_type(const char *name)
100 struct list_head *tmp;
101 struct obd_type *type;
103 spin_lock(&obd_types_lock);
104 list_for_each(tmp, &obd_types) {
105 type = list_entry(tmp, struct obd_type, typ_chain);
106 if (strcmp(type->typ_name, name) == 0) {
107 spin_unlock(&obd_types_lock);
111 spin_unlock(&obd_types_lock);
115 struct obd_type *class_get_type(const char *name)
117 struct obd_type *type = class_search_type(name);
121 const char *modname = name;
122 if (!request_module(modname)) {
123 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124 type = class_search_type(name);
126 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
132 spin_lock(&type->obd_type_lock);
134 try_module_get(type->typ_dt_ops->o_owner);
135 spin_unlock(&type->obd_type_lock);
140 void class_put_type(struct obd_type *type)
143 spin_lock(&type->obd_type_lock);
145 module_put(type->typ_dt_ops->o_owner);
146 spin_unlock(&type->obd_type_lock);
149 #define CLASS_MAX_NAME 1024
151 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
152 struct lprocfs_vars *vars, const char *name,
153 struct lu_device_type *ldt)
155 struct obd_type *type;
160 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
162 if (class_search_type(name)) {
163 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
168 OBD_ALLOC(type, sizeof(*type));
172 OBD_ALLOC_PTR(type->typ_dt_ops);
173 OBD_ALLOC_PTR(type->typ_md_ops);
174 OBD_ALLOC(type->typ_name, strlen(name) + 1);
176 if (type->typ_dt_ops == NULL ||
177 type->typ_md_ops == NULL ||
178 type->typ_name == NULL)
181 *(type->typ_dt_ops) = *dt_ops;
182 /* md_ops is optional */
184 *(type->typ_md_ops) = *md_ops;
185 strcpy(type->typ_name, name);
186 spin_lock_init(&type->obd_type_lock);
189 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
191 if (IS_ERR(type->typ_procroot)) {
192 rc = PTR_ERR(type->typ_procroot);
193 type->typ_procroot = NULL;
199 rc = lu_device_type_init(ldt);
204 spin_lock(&obd_types_lock);
205 list_add(&type->typ_chain, &obd_types);
206 spin_unlock(&obd_types_lock);
211 if (type->typ_name != NULL)
212 OBD_FREE(type->typ_name, strlen(name) + 1);
213 if (type->typ_md_ops != NULL)
214 OBD_FREE_PTR(type->typ_md_ops);
215 if (type->typ_dt_ops != NULL)
216 OBD_FREE_PTR(type->typ_dt_ops);
217 OBD_FREE(type, sizeof(*type));
221 int class_unregister_type(const char *name)
223 struct obd_type *type = class_search_type(name);
227 CERROR("unknown obd type\n");
231 if (type->typ_refcnt) {
232 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
233 /* This is a bad situation, let's make the best of it */
234 /* Remove ops, but leave the name for debugging */
235 OBD_FREE_PTR(type->typ_dt_ops);
236 OBD_FREE_PTR(type->typ_md_ops);
240 if (type->typ_procroot) {
241 lprocfs_remove(&type->typ_procroot);
245 lu_device_type_fini(type->typ_lu);
247 spin_lock(&obd_types_lock);
248 list_del(&type->typ_chain);
249 spin_unlock(&obd_types_lock);
250 OBD_FREE(type->typ_name, strlen(name) + 1);
251 if (type->typ_dt_ops != NULL)
252 OBD_FREE_PTR(type->typ_dt_ops);
253 if (type->typ_md_ops != NULL)
254 OBD_FREE_PTR(type->typ_md_ops);
255 OBD_FREE(type, sizeof(*type));
257 } /* class_unregister_type */
260 * Create a new obd device.
262 * Find an empty slot in ::obd_devs[], create a new obd device in it.
264 * \param typename [in] obd device type string.
265 * \param name [in] obd device name.
267 * \retval NULL if create fails, otherwise return the obd device
270 struct obd_device *class_newdev(const char *type_name, const char *name)
272 struct obd_device *result = NULL;
273 struct obd_device *newdev;
274 struct obd_type *type = NULL;
276 int new_obd_minor = 0;
278 if (strlen(name) >= MAX_OBD_NAME) {
279 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
280 RETURN(ERR_PTR(-EINVAL));
283 type = class_get_type(type_name);
285 CERROR("OBD: unknown type: %s\n", type_name);
286 RETURN(ERR_PTR(-ENODEV));
289 newdev = obd_device_alloc();
290 if (newdev == NULL) {
291 class_put_type(type);
292 RETURN(ERR_PTR(-ENOMEM));
294 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
296 spin_lock(&obd_dev_lock);
297 for (i = 0; i < class_devno_max(); i++) {
298 struct obd_device *obd = class_num2obd(i);
299 if (obd && obd->obd_name &&
300 (strcmp(name, obd->obd_name) == 0)) {
301 CERROR("Device %s already exists, won't add\n", name);
303 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
304 "%p obd_magic %08x != %08x\n", result,
305 result->obd_magic, OBD_DEVICE_MAGIC);
306 LASSERTF(result->obd_minor == new_obd_minor,
307 "%p obd_minor %d != %d\n", result,
308 result->obd_minor, new_obd_minor);
310 obd_devs[result->obd_minor] = NULL;
311 result->obd_name[0]='\0';
313 result = ERR_PTR(-EEXIST);
316 if (!result && !obd) {
318 result->obd_minor = i;
320 result->obd_type = type;
321 strncpy(result->obd_name, name,
322 sizeof(result->obd_name) - 1);
323 obd_devs[i] = result;
326 spin_unlock(&obd_dev_lock);
328 if (result == NULL && i >= class_devno_max()) {
329 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
331 result = ERR_PTR(-EOVERFLOW);
334 if (IS_ERR(result)) {
335 obd_device_free(newdev);
336 class_put_type(type);
338 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
339 result->obd_name, result);
344 void class_release_dev(struct obd_device *obd)
346 struct obd_type *obd_type = obd->obd_type;
348 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
349 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
350 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
351 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
352 LASSERT(obd_type != NULL);
354 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
355 obd->obd_name,obd->obd_type->typ_name);
357 spin_lock(&obd_dev_lock);
358 obd_devs[obd->obd_minor] = NULL;
359 spin_unlock(&obd_dev_lock);
360 obd_device_free(obd);
362 class_put_type(obd_type);
365 int class_name2dev(const char *name)
372 spin_lock(&obd_dev_lock);
373 for (i = 0; i < class_devno_max(); i++) {
374 struct obd_device *obd = class_num2obd(i);
375 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
376 /* Make sure we finished attaching before we give
377 out any references */
378 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
379 if (obd->obd_attached) {
380 spin_unlock(&obd_dev_lock);
386 spin_unlock(&obd_dev_lock);
391 struct obd_device *class_name2obd(const char *name)
393 int dev = class_name2dev(name);
395 if (dev < 0 || dev > class_devno_max())
397 return class_num2obd(dev);
400 int class_uuid2dev(struct obd_uuid *uuid)
404 spin_lock(&obd_dev_lock);
405 for (i = 0; i < class_devno_max(); i++) {
406 struct obd_device *obd = class_num2obd(i);
407 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
408 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
409 spin_unlock(&obd_dev_lock);
413 spin_unlock(&obd_dev_lock);
418 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
420 int dev = class_uuid2dev(uuid);
423 return class_num2obd(dev);
427 * Get obd device from ::obd_devs[]
429 * \param num [in] array index
431 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
432 * otherwise return the obd device there.
434 struct obd_device *class_num2obd(int num)
436 struct obd_device *obd = NULL;
438 if (num < class_devno_max()) {
443 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
444 "%p obd_magic %08x != %08x\n",
445 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
446 LASSERTF(obd->obd_minor == num,
447 "%p obd_minor %0d != %0d\n",
448 obd, obd->obd_minor, num);
454 void class_obd_list(void)
459 spin_lock(&obd_dev_lock);
460 for (i = 0; i < class_devno_max(); i++) {
461 struct obd_device *obd = class_num2obd(i);
464 if (obd->obd_stopping)
466 else if (obd->obd_set_up)
468 else if (obd->obd_attached)
472 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
473 i, status, obd->obd_type->typ_name,
474 obd->obd_name, obd->obd_uuid.uuid,
475 atomic_read(&obd->obd_refcount));
477 spin_unlock(&obd_dev_lock);
481 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
482 specified, then only the client with that uuid is returned,
483 otherwise any client connected to the tgt is returned. */
484 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
485 const char * typ_name,
486 struct obd_uuid *grp_uuid)
490 spin_lock(&obd_dev_lock);
491 for (i = 0; i < class_devno_max(); i++) {
492 struct obd_device *obd = class_num2obd(i);
495 if ((strncmp(obd->obd_type->typ_name, typ_name,
496 strlen(typ_name)) == 0)) {
497 if (obd_uuid_equals(tgt_uuid,
498 &obd->u.cli.cl_target_uuid) &&
499 ((grp_uuid)? obd_uuid_equals(grp_uuid,
500 &obd->obd_uuid) : 1)) {
501 spin_unlock(&obd_dev_lock);
506 spin_unlock(&obd_dev_lock);
511 /* Iterate the obd_device list looking devices have grp_uuid. Start
512 searching at *next, and if a device is found, the next index to look
513 at is saved in *next. If next is NULL, then the first matching device
514 will always be returned. */
515 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
521 else if (*next >= 0 && *next < class_devno_max())
526 spin_lock(&obd_dev_lock);
527 for (; i < class_devno_max(); i++) {
528 struct obd_device *obd = class_num2obd(i);
531 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
534 spin_unlock(&obd_dev_lock);
538 spin_unlock(&obd_dev_lock);
544 * to notify sptlrpc log for @fsname has changed, let every relevant OBD
545 * adjust sptlrpc settings accordingly.
547 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
549 struct obd_device *obd;
553 LASSERT(namelen > 0);
555 spin_lock(&obd_dev_lock);
556 for (i = 0; i < class_devno_max(); i++) {
557 obd = class_num2obd(i);
559 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
562 /* only notify mdc, osc, mdt, ost */
563 type = obd->obd_type->typ_name;
564 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
565 strcmp(type, LUSTRE_OSC_NAME) != 0 &&
566 strcmp(type, LUSTRE_MDT_NAME) != 0 &&
567 strcmp(type, LUSTRE_OST_NAME) != 0)
570 if (strncmp(obd->obd_name, fsname, namelen))
573 class_incref(obd, __FUNCTION__, obd);
574 spin_unlock(&obd_dev_lock);
575 rc2 = obd_set_info_async(obd->obd_self_export,
576 sizeof(KEY_SPTLRPC_CONF),
577 KEY_SPTLRPC_CONF, 0, NULL, NULL);
579 class_decref(obd, __FUNCTION__, obd);
580 spin_lock(&obd_dev_lock);
582 spin_unlock(&obd_dev_lock);
585 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
587 void obd_cleanup_caches(void)
592 if (obd_device_cachep) {
593 rc = cfs_mem_cache_destroy(obd_device_cachep);
594 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
595 obd_device_cachep = NULL;
598 rc = cfs_mem_cache_destroy(obdo_cachep);
599 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
603 rc = cfs_mem_cache_destroy(import_cachep);
604 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
605 import_cachep = NULL;
608 rc = cfs_mem_cache_destroy(capa_cachep);
609 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
615 int obd_init_caches(void)
619 LASSERT(obd_device_cachep == NULL);
620 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
621 sizeof(struct obd_device),
623 if (!obd_device_cachep)
626 LASSERT(obdo_cachep == NULL);
627 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
632 LASSERT(import_cachep == NULL);
633 import_cachep = cfs_mem_cache_create("ll_import_cache",
634 sizeof(struct obd_import),
639 LASSERT(capa_cachep == NULL);
640 capa_cachep = cfs_mem_cache_create("capa_cache",
641 sizeof(struct obd_capa), 0, 0);
647 obd_cleanup_caches();
652 /* map connection to client */
653 struct obd_export *class_conn2export(struct lustre_handle *conn)
655 struct obd_export *export;
659 CDEBUG(D_CACHE, "looking for null handle\n");
663 if (conn->cookie == -1) { /* this means assign a new connection */
664 CDEBUG(D_CACHE, "want a new connection\n");
668 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
669 export = class_handle2object(conn->cookie);
673 struct obd_device *class_exp2obd(struct obd_export *exp)
680 struct obd_device *class_conn2obd(struct lustre_handle *conn)
682 struct obd_export *export;
683 export = class_conn2export(conn);
685 struct obd_device *obd = export->exp_obd;
686 class_export_put(export);
692 struct obd_import *class_exp2cliimp(struct obd_export *exp)
694 struct obd_device *obd = exp->exp_obd;
697 return obd->u.cli.cl_import;
700 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
702 struct obd_device *obd = class_conn2obd(conn);
705 return obd->u.cli.cl_import;
708 /* Export management functions */
709 static void class_export_destroy(struct obd_export *exp)
711 struct obd_device *obd = exp->exp_obd;
714 LASSERT (atomic_read(&exp->exp_refcount) == 0);
716 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
717 exp->exp_client_uuid.uuid, obd->obd_name);
719 LASSERT(obd != NULL);
721 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
722 if (exp->exp_connection)
723 ptlrpc_put_connection_superhack(exp->exp_connection);
725 LASSERT(list_empty(&exp->exp_outstanding_replies));
726 LASSERT(list_empty(&exp->exp_uncommitted_replies));
727 LASSERT(list_empty(&exp->exp_req_replay_queue));
728 LASSERT(list_empty(&exp->exp_queued_rpc));
729 obd_destroy_export(exp);
730 class_decref(obd, "export", exp);
732 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
736 static void export_handle_addref(void *export)
738 class_export_get(export);
741 struct obd_export *class_export_get(struct obd_export *exp)
743 atomic_inc(&exp->exp_refcount);
744 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
745 atomic_read(&exp->exp_refcount));
748 EXPORT_SYMBOL(class_export_get);
750 void class_export_put(struct obd_export *exp)
752 LASSERT(exp != NULL);
753 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
754 atomic_read(&exp->exp_refcount) - 1);
755 LASSERT(atomic_read(&exp->exp_refcount) > 0);
756 LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
758 if (atomic_dec_and_test(&exp->exp_refcount)) {
759 CDEBUG(D_IOCTL, "final put %p/%s\n",
760 exp, exp->exp_client_uuid.uuid);
761 obd_zombie_export_add(exp);
764 EXPORT_SYMBOL(class_export_put);
766 /* Creates a new export, adds it to the hash table, and returns a
767 * pointer to it. The refcount is 2: one for the hash reference, and
768 * one for the pointer returned by this function. */
769 struct obd_export *class_new_export(struct obd_device *obd,
770 struct obd_uuid *cluuid)
772 struct obd_export *export;
775 OBD_ALLOC_PTR(export);
777 return ERR_PTR(-ENOMEM);
779 export->exp_conn_cnt = 0;
780 export->exp_lock_hash = NULL;
781 atomic_set(&export->exp_refcount, 2);
782 atomic_set(&export->exp_rpc_count, 0);
783 export->exp_obd = obd;
784 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
785 spin_lock_init(&export->exp_uncommitted_replies_lock);
786 CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
787 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
788 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
789 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
790 class_handle_hash(&export->exp_handle, export_handle_addref);
791 export->exp_last_request_time = cfs_time_current_sec();
792 spin_lock_init(&export->exp_lock);
793 INIT_HLIST_NODE(&export->exp_uuid_hash);
794 INIT_HLIST_NODE(&export->exp_nid_hash);
796 export->exp_sp_peer = LUSTRE_SP_ANY;
797 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
798 export->exp_client_uuid = *cluuid;
799 obd_init_export(export);
801 spin_lock(&obd->obd_dev_lock);
802 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
803 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
804 &export->exp_uuid_hash);
806 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
807 obd->obd_name, cluuid->uuid, rc);
808 spin_unlock(&obd->obd_dev_lock);
809 class_handle_unhash(&export->exp_handle);
810 OBD_FREE_PTR(export);
811 return ERR_PTR(-EALREADY);
815 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
816 class_incref(obd, "export", export);
817 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
818 list_add_tail(&export->exp_obd_chain_timed,
819 &export->exp_obd->obd_exports_timed);
820 export->exp_obd->obd_num_exports++;
821 spin_unlock(&obd->obd_dev_lock);
825 EXPORT_SYMBOL(class_new_export);
827 void class_unlink_export(struct obd_export *exp)
829 class_handle_unhash(&exp->exp_handle);
831 spin_lock(&exp->exp_obd->obd_dev_lock);
832 /* delete an uuid-export hashitem from hashtables */
833 if (!hlist_unhashed(&exp->exp_uuid_hash))
834 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
835 &exp->exp_client_uuid,
836 &exp->exp_uuid_hash);
838 list_del_init(&exp->exp_obd_chain);
839 list_del_init(&exp->exp_obd_chain_timed);
840 exp->exp_obd->obd_num_exports--;
841 spin_unlock(&exp->exp_obd->obd_dev_lock);
843 /* Keep these counter valid always */
844 spin_lock_bh(&exp->exp_obd->obd_processing_task_lock);
845 if (exp->exp_delayed)
846 exp->exp_obd->obd_delayed_clients--;
847 else if (exp->exp_in_recovery)
848 exp->exp_obd->obd_recoverable_clients--;
849 else if (exp->exp_obd->obd_recovering)
850 exp->exp_obd->obd_max_recoverable_clients--;
851 spin_unlock_bh(&exp->exp_obd->obd_processing_task_lock);
852 class_export_put(exp);
854 EXPORT_SYMBOL(class_unlink_export);
856 /* Import management functions */
857 void class_import_destroy(struct obd_import *imp)
861 CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
862 imp->imp_obd->obd_name);
864 LASSERT(atomic_read(&imp->imp_refcount) == 0);
866 ptlrpc_put_connection_superhack(imp->imp_connection);
868 while (!list_empty(&imp->imp_conn_list)) {
869 struct obd_import_conn *imp_conn;
871 imp_conn = list_entry(imp->imp_conn_list.next,
872 struct obd_import_conn, oic_item);
873 list_del_init(&imp_conn->oic_item);
874 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
875 OBD_FREE(imp_conn, sizeof(*imp_conn));
878 LASSERT(imp->imp_sec == NULL);
879 class_decref(imp->imp_obd, "import", imp);
880 OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
884 static void import_handle_addref(void *import)
886 class_import_get(import);
889 struct obd_import *class_import_get(struct obd_import *import)
891 LASSERT(atomic_read(&import->imp_refcount) >= 0);
892 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
893 atomic_inc(&import->imp_refcount);
894 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
895 atomic_read(&import->imp_refcount),
896 import->imp_obd->obd_name);
899 EXPORT_SYMBOL(class_import_get);
901 void class_import_put(struct obd_import *imp)
905 LASSERT(atomic_read(&imp->imp_refcount) > 0);
906 LASSERT(atomic_read(&imp->imp_refcount) < 0x5a5a5a);
907 LASSERT(list_empty(&imp->imp_zombie_chain));
909 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
910 atomic_read(&imp->imp_refcount) - 1,
911 imp->imp_obd->obd_name);
913 if (atomic_dec_and_test(&imp->imp_refcount)) {
914 CDEBUG(D_INFO, "final put import %p\n", imp);
915 obd_zombie_import_add(imp);
920 EXPORT_SYMBOL(class_import_put);
922 static void init_imp_at(struct imp_at *at) {
924 at_init(&at->iat_net_latency, 0, 0);
925 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
926 /* max service estimates are tracked on the server side, so
927 don't use the AT history here, just use the last reported
928 val. (But keep hist for proc histogram, worst_ever) */
929 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
934 struct obd_import *class_new_import(struct obd_device *obd)
936 struct obd_import *imp;
938 OBD_ALLOC(imp, sizeof(*imp));
942 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
943 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
944 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
945 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
946 spin_lock_init(&imp->imp_lock);
947 imp->imp_last_success_conn = 0;
948 imp->imp_state = LUSTRE_IMP_NEW;
949 imp->imp_obd = class_incref(obd, "import", imp);
950 sema_init(&imp->imp_sec_mutex, 1);
951 cfs_waitq_init(&imp->imp_recovery_waitq);
953 atomic_set(&imp->imp_refcount, 2);
954 atomic_set(&imp->imp_unregistering, 0);
955 atomic_set(&imp->imp_inflight, 0);
956 atomic_set(&imp->imp_replay_inflight, 0);
957 atomic_set(&imp->imp_inval_count, 0);
958 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
959 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
960 class_handle_hash(&imp->imp_handle, import_handle_addref);
961 init_imp_at(&imp->imp_at);
963 /* the default magic is V2, will be used in connect RPC, and
964 * then adjusted according to the flags in request/reply. */
965 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
969 EXPORT_SYMBOL(class_new_import);
971 void class_destroy_import(struct obd_import *import)
973 LASSERT(import != NULL);
974 LASSERT(import != LP_POISON);
976 class_handle_unhash(&import->imp_handle);
978 spin_lock(&import->imp_lock);
979 import->imp_generation++;
980 spin_unlock(&import->imp_lock);
981 class_import_put(import);
983 EXPORT_SYMBOL(class_destroy_import);
985 /* A connection defines an export context in which preallocation can
986 be managed. This releases the export pointer reference, and returns
987 the export handle, so the export refcount is 1 when this function
989 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
990 struct obd_uuid *cluuid)
992 struct obd_export *export;
993 LASSERT(conn != NULL);
994 LASSERT(obd != NULL);
995 LASSERT(cluuid != NULL);
998 export = class_new_export(obd, cluuid);
1000 RETURN(PTR_ERR(export));
1002 conn->cookie = export->exp_handle.h_cookie;
1003 class_export_put(export);
1005 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1006 cluuid->uuid, conn->cookie);
1009 EXPORT_SYMBOL(class_connect);
1011 /* if export is involved in recovery then clean up related things */
1012 void class_export_recovery_cleanup(struct obd_export *exp)
1014 struct obd_device *obd = exp->exp_obd;
1016 spin_lock_bh(&obd->obd_processing_task_lock);
1017 if (obd->obd_recovering && exp->exp_in_recovery) {
1018 spin_lock(&exp->exp_lock);
1019 exp->exp_in_recovery = 0;
1020 spin_unlock(&exp->exp_lock);
1021 obd->obd_connected_clients--;
1022 /* each connected client is counted as recoverable */
1023 obd->obd_recoverable_clients--;
1024 if (exp->exp_req_replay_needed) {
1025 spin_lock(&exp->exp_lock);
1026 exp->exp_req_replay_needed = 0;
1027 spin_unlock(&exp->exp_lock);
1028 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1029 atomic_dec(&obd->obd_req_replay_clients);
1031 if (exp->exp_lock_replay_needed) {
1032 spin_lock(&exp->exp_lock);
1033 exp->exp_lock_replay_needed = 0;
1034 spin_unlock(&exp->exp_lock);
1035 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1036 atomic_dec(&obd->obd_lock_replay_clients);
1039 spin_unlock_bh(&obd->obd_processing_task_lock);
1042 /* This function removes two references from the export: one for the
1043 * hash entry and one for the export pointer passed in. The export
1044 * pointer passed to this function is destroyed should not be used
1046 int class_disconnect(struct obd_export *export)
1048 int already_disconnected;
1051 if (export == NULL) {
1053 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1057 spin_lock(&export->exp_lock);
1058 already_disconnected = export->exp_disconnected;
1059 export->exp_disconnected = 1;
1061 if (!hlist_unhashed(&export->exp_nid_hash))
1062 lustre_hash_del(export->exp_obd->obd_nid_hash,
1063 &export->exp_connection->c_peer.nid,
1064 &export->exp_nid_hash);
1066 spin_unlock(&export->exp_lock);
1068 /* class_cleanup(), abort_recovery(), and class_fail_export()
1069 * all end up in here, and if any of them race we shouldn't
1070 * call extra class_export_puts(). */
1071 if (already_disconnected)
1074 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1075 export->exp_handle.h_cookie);
1077 class_export_recovery_cleanup(export);
1078 class_unlink_export(export);
1079 class_export_put(export);
1083 static void class_disconnect_export_list(struct list_head *list,
1084 enum obd_option flags)
1087 struct lustre_handle fake_conn;
1088 struct obd_export *fake_exp, *exp;
1091 /* It's possible that an export may disconnect itself, but
1092 * nothing else will be added to this list. */
1093 while (!list_empty(list)) {
1094 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1095 class_export_get(exp);
1097 spin_lock(&exp->exp_lock);
1098 exp->exp_flags = flags;
1099 spin_unlock(&exp->exp_lock);
1101 if (obd_uuid_equals(&exp->exp_client_uuid,
1102 &exp->exp_obd->obd_uuid)) {
1104 "exp %p export uuid == obd uuid, don't discon\n",
1106 /* Need to delete this now so we don't end up pointing
1107 * to work_list later when this export is cleaned up. */
1108 list_del_init(&exp->exp_obd_chain);
1109 class_export_put(exp);
1113 fake_conn.cookie = exp->exp_handle.h_cookie;
1114 fake_exp = class_conn2export(&fake_conn);
1116 class_export_put(exp);
1120 spin_lock(&fake_exp->exp_lock);
1121 fake_exp->exp_flags = flags;
1122 spin_unlock(&fake_exp->exp_lock);
1124 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1125 "last request at "CFS_TIME_T"\n",
1126 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1127 exp, exp->exp_last_request_time);
1128 rc = obd_disconnect(fake_exp);
1129 class_export_put(exp);
1134 void class_disconnect_exports(struct obd_device *obd)
1136 struct list_head work_list;
1139 /* Move all of the exports from obd_exports to a work list, en masse. */
1140 CFS_INIT_LIST_HEAD(&work_list);
1141 spin_lock(&obd->obd_dev_lock);
1142 list_splice_init(&obd->obd_exports, &work_list);
1143 list_splice_init(&obd->obd_delayed_exports, &work_list);
1144 spin_unlock(&obd->obd_dev_lock);
1146 if (!list_empty(&work_list)) {
1147 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1148 "disconnecting them\n", obd->obd_minor, obd);
1149 class_disconnect_export_list(&work_list,
1150 exp_flags_from_obd(obd));
1152 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1153 obd->obd_minor, obd);
1156 EXPORT_SYMBOL(class_disconnect_exports);
1158 /* Remove exports that have not completed recovery.
1160 int class_disconnect_stale_exports(struct obd_device *obd,
1161 int (*test_export)(struct obd_export *),
1162 enum obd_option flags)
1164 struct list_head work_list;
1165 struct list_head *pos, *n;
1166 struct obd_export *exp;
1170 CFS_INIT_LIST_HEAD(&work_list);
1171 spin_lock(&obd->obd_dev_lock);
1172 list_for_each_safe(pos, n, &obd->obd_exports) {
1173 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1174 if (test_export(exp))
1177 list_move(&exp->exp_obd_chain, &work_list);
1178 /* don't count self-export as client */
1179 if (obd_uuid_equals(&exp->exp_client_uuid,
1180 &exp->exp_obd->obd_uuid))
1184 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1185 obd->obd_name, exp->exp_client_uuid.uuid,
1186 exp->exp_connection == NULL ? "<unknown>" :
1187 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1189 spin_unlock(&obd->obd_dev_lock);
1191 CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1192 obd->obd_name, cnt);
1193 class_disconnect_export_list(&work_list, flags);
1196 EXPORT_SYMBOL(class_disconnect_stale_exports);
1198 void class_fail_export(struct obd_export *exp)
1200 int rc, already_failed;
1202 spin_lock(&exp->exp_lock);
1203 already_failed = exp->exp_failed;
1204 exp->exp_failed = 1;
1205 spin_unlock(&exp->exp_lock);
1207 if (already_failed) {
1208 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1209 exp, exp->exp_client_uuid.uuid);
1213 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1214 exp, exp->exp_client_uuid.uuid);
1216 if (obd_dump_on_timeout)
1217 libcfs_debug_dumplog();
1219 /* Most callers into obd_disconnect are removing their own reference
1220 * (request, for example) in addition to the one from the hash table.
1221 * We don't have such a reference here, so make one. */
1222 class_export_get(exp);
1223 rc = obd_disconnect(exp);
1225 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1227 CDEBUG(D_HA, "disconnected export %p/%s\n",
1228 exp, exp->exp_client_uuid.uuid);
1230 EXPORT_SYMBOL(class_fail_export);
1232 char *obd_export_nid2str(struct obd_export *exp)
1234 if (exp->exp_connection != NULL)
1235 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1239 EXPORT_SYMBOL(obd_export_nid2str);
1241 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1243 struct obd_export *doomed_exp = NULL;
1244 int exports_evicted = 0;
1246 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1249 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1250 if (doomed_exp == NULL)
1253 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1254 "nid %s found, wanted nid %s, requested nid %s\n",
1255 obd_export_nid2str(doomed_exp),
1256 libcfs_nid2str(nid_key), nid);
1257 LASSERTF(doomed_exp != obd->obd_self_export,
1258 "self-export is hashed by NID?\n");
1260 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1261 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1263 class_fail_export(doomed_exp);
1264 class_export_put(doomed_exp);
1267 if (!exports_evicted)
1268 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1269 obd->obd_name, nid);
1270 return exports_evicted;
1272 EXPORT_SYMBOL(obd_export_evict_by_nid);
1274 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1276 struct obd_export *doomed_exp = NULL;
1277 struct obd_uuid doomed_uuid;
1278 int exports_evicted = 0;
1280 obd_str2uuid(&doomed_uuid, uuid);
1281 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1282 CERROR("%s: can't evict myself\n", obd->obd_name);
1283 return exports_evicted;
1286 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1288 if (doomed_exp == NULL) {
1289 CERROR("%s: can't disconnect %s: no exports found\n",
1290 obd->obd_name, uuid);
1292 CWARN("%s: evicting %s at adminstrative request\n",
1293 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1294 class_fail_export(doomed_exp);
1295 class_export_put(doomed_exp);
1299 return exports_evicted;
1301 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1304 * kill zombie imports and exports
1306 void obd_zombie_impexp_cull(void)
1308 struct obd_import *import;
1309 struct obd_export *export;
1313 spin_lock(&obd_zombie_impexp_lock);
1316 if (!list_empty(&obd_zombie_imports)) {
1317 import = list_entry(obd_zombie_imports.next,
1320 list_del_init(&import->imp_zombie_chain);
1324 if (!list_empty(&obd_zombie_exports)) {
1325 export = list_entry(obd_zombie_exports.next,
1328 list_del_init(&export->exp_obd_chain);
1331 spin_unlock(&obd_zombie_impexp_lock);
1334 class_import_destroy(import);
1337 class_export_destroy(export);
1339 } while (import != NULL || export != NULL);
1343 static struct completion obd_zombie_start;
1344 static struct completion obd_zombie_stop;
1345 static unsigned long obd_zombie_flags;
1346 static cfs_waitq_t obd_zombie_waitq;
1349 OBD_ZOMBIE_STOP = 1 << 1
1353 * check for work for kill zombie import/export thread.
1355 static int obd_zombie_impexp_check(void *arg)
1359 spin_lock(&obd_zombie_impexp_lock);
1360 rc = list_empty(&obd_zombie_imports) &&
1361 list_empty(&obd_zombie_exports) &&
1362 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1364 spin_unlock(&obd_zombie_impexp_lock);
1370 * Add export to the obd_zombe thread and notify it.
1372 static void obd_zombie_export_add(struct obd_export *exp) {
1373 spin_lock(&obd_zombie_impexp_lock);
1374 LASSERT(list_empty(&exp->exp_obd_chain));
1375 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1376 spin_unlock(&obd_zombie_impexp_lock);
1378 if (obd_zombie_impexp_notify != NULL)
1379 obd_zombie_impexp_notify();
1383 * Add import to the obd_zombe thread and notify it.
1385 static void obd_zombie_import_add(struct obd_import *imp) {
1386 LASSERT(imp->imp_sec == NULL);
1387 spin_lock(&obd_zombie_impexp_lock);
1388 LASSERT(list_empty(&imp->imp_zombie_chain));
1389 list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1390 spin_unlock(&obd_zombie_impexp_lock);
1392 if (obd_zombie_impexp_notify != NULL)
1393 obd_zombie_impexp_notify();
1397 * notify import/export destroy thread about new zombie.
1399 static void obd_zombie_impexp_notify(void)
1401 cfs_waitq_signal(&obd_zombie_waitq);
1405 * check whether obd_zombie is idle
1407 static int obd_zombie_is_idle(void)
1411 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1412 spin_lock(&obd_zombie_impexp_lock);
1413 rc = list_empty(&obd_zombie_imports) &&
1414 list_empty(&obd_zombie_exports);
1415 spin_unlock(&obd_zombie_impexp_lock);
1420 * wait when obd_zombie import/export queues become empty
1422 void obd_zombie_barrier(void)
1424 struct l_wait_info lwi = { 0 };
1425 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1427 EXPORT_SYMBOL(obd_zombie_barrier);
1432 * destroy zombie export/import thread.
1434 static int obd_zombie_impexp_thread(void *unused)
1438 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1439 complete(&obd_zombie_start);
1443 complete(&obd_zombie_start);
1445 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1446 struct l_wait_info lwi = { 0 };
1448 l_wait_event(obd_zombie_waitq,
1449 !obd_zombie_impexp_check(NULL), &lwi);
1450 obd_zombie_impexp_cull();
1453 * Notify obd_zombie_barrier callers that queues
1456 cfs_waitq_signal(&obd_zombie_waitq);
1459 complete(&obd_zombie_stop);
1464 #else /* ! KERNEL */
1466 static atomic_t zombie_recur = ATOMIC_INIT(0);
1467 static void *obd_zombie_impexp_work_cb;
1468 static void *obd_zombie_impexp_idle_cb;
1470 int obd_zombie_impexp_kill(void *arg)
1474 if (atomic_inc_return(&zombie_recur) == 1) {
1475 obd_zombie_impexp_cull();
1478 atomic_dec(&zombie_recur);
1485 * start destroy zombie import/export thread
1487 int obd_zombie_impexp_init(void)
1491 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1492 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1493 spin_lock_init(&obd_zombie_impexp_lock);
1494 init_completion(&obd_zombie_start);
1495 init_completion(&obd_zombie_stop);
1496 cfs_waitq_init(&obd_zombie_waitq);
1499 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1503 wait_for_completion(&obd_zombie_start);
1506 obd_zombie_impexp_work_cb =
1507 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1508 &obd_zombie_impexp_kill, NULL);
1510 obd_zombie_impexp_idle_cb =
1511 liblustre_register_idle_callback("obd_zombi_impexp_check",
1512 &obd_zombie_impexp_check, NULL);
1518 * stop destroy zombie import/export thread
1520 void obd_zombie_impexp_stop(void)
1522 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1523 obd_zombie_impexp_notify();
1525 wait_for_completion(&obd_zombie_stop);
1527 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1528 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);