1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
70 static struct obd_device *obd_device_alloc(void)
72 struct obd_device *obd;
74 OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
76 obd->obd_magic = OBD_DEVICE_MAGIC;
80 EXPORT_SYMBOL(obd_device_alloc);
82 static void obd_device_free(struct obd_device *obd)
85 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87 if (obd->obd_namespace != NULL) {
88 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89 obd, obd->obd_namespace, obd->obd_force);
92 lu_ref_fini(&obd->obd_reference);
93 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 struct obd_type *class_search_type(const char *name)
98 struct list_head *tmp;
99 struct obd_type *type;
101 spin_lock(&obd_types_lock);
102 list_for_each(tmp, &obd_types) {
103 type = list_entry(tmp, struct obd_type, typ_chain);
104 if (strcmp(type->typ_name, name) == 0) {
105 spin_unlock(&obd_types_lock);
109 spin_unlock(&obd_types_lock);
113 struct obd_type *class_get_type(const char *name)
115 struct obd_type *type = class_search_type(name);
119 const char *modname = name;
120 if (!request_module(modname)) {
121 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122 type = class_search_type(name);
124 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130 spin_lock(&type->obd_type_lock);
132 try_module_get(type->typ_dt_ops->o_owner);
133 spin_unlock(&type->obd_type_lock);
138 void class_put_type(struct obd_type *type)
141 spin_lock(&type->obd_type_lock);
143 module_put(type->typ_dt_ops->o_owner);
144 spin_unlock(&type->obd_type_lock);
147 #define CLASS_MAX_NAME 1024
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150 struct lprocfs_vars *vars, const char *name,
151 struct lu_device_type *ldt)
153 struct obd_type *type;
158 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
160 if (class_search_type(name)) {
161 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166 OBD_ALLOC(type, sizeof(*type));
170 OBD_ALLOC_PTR(type->typ_dt_ops);
171 OBD_ALLOC_PTR(type->typ_md_ops);
172 OBD_ALLOC(type->typ_name, strlen(name) + 1);
174 if (type->typ_dt_ops == NULL ||
175 type->typ_md_ops == NULL ||
176 type->typ_name == NULL)
179 *(type->typ_dt_ops) = *dt_ops;
180 /* md_ops is optional */
182 *(type->typ_md_ops) = *md_ops;
183 strcpy(type->typ_name, name);
184 spin_lock_init(&type->obd_type_lock);
187 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
189 if (IS_ERR(type->typ_procroot)) {
190 rc = PTR_ERR(type->typ_procroot);
191 type->typ_procroot = NULL;
197 rc = lu_device_type_init(ldt);
202 spin_lock(&obd_types_lock);
203 list_add(&type->typ_chain, &obd_types);
204 spin_unlock(&obd_types_lock);
209 if (type->typ_name != NULL)
210 OBD_FREE(type->typ_name, strlen(name) + 1);
211 if (type->typ_md_ops != NULL)
212 OBD_FREE_PTR(type->typ_md_ops);
213 if (type->typ_dt_ops != NULL)
214 OBD_FREE_PTR(type->typ_dt_ops);
215 OBD_FREE(type, sizeof(*type));
219 int class_unregister_type(const char *name)
221 struct obd_type *type = class_search_type(name);
225 CERROR("unknown obd type\n");
229 if (type->typ_refcnt) {
230 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231 /* This is a bad situation, let's make the best of it */
232 /* Remove ops, but leave the name for debugging */
233 OBD_FREE_PTR(type->typ_dt_ops);
234 OBD_FREE_PTR(type->typ_md_ops);
238 if (type->typ_procroot) {
239 lprocfs_remove(&type->typ_procroot);
243 lu_device_type_fini(type->typ_lu);
245 spin_lock(&obd_types_lock);
246 list_del(&type->typ_chain);
247 spin_unlock(&obd_types_lock);
248 OBD_FREE(type->typ_name, strlen(name) + 1);
249 if (type->typ_dt_ops != NULL)
250 OBD_FREE_PTR(type->typ_dt_ops);
251 if (type->typ_md_ops != NULL)
252 OBD_FREE_PTR(type->typ_md_ops);
253 OBD_FREE(type, sizeof(*type));
255 } /* class_unregister_type */
258 * Create a new obd device.
260 * Find an empty slot in ::obd_devs[], create a new obd device in it.
262 * \param typename [in] obd device type string.
263 * \param name [in] obd device name.
265 * \retval NULL if create fails, otherwise return the obd device
268 struct obd_device *class_newdev(const char *type_name, const char *name)
270 struct obd_device *result = NULL;
271 struct obd_device *newdev;
272 struct obd_type *type = NULL;
274 int new_obd_minor = 0;
276 if (strlen(name) >= MAX_OBD_NAME) {
277 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278 RETURN(ERR_PTR(-EINVAL));
281 type = class_get_type(type_name);
283 CERROR("OBD: unknown type: %s\n", type_name);
284 RETURN(ERR_PTR(-ENODEV));
287 newdev = obd_device_alloc();
288 if (newdev == NULL) {
289 class_put_type(type);
290 RETURN(ERR_PTR(-ENOMEM));
292 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
294 spin_lock(&obd_dev_lock);
295 for (i = 0; i < class_devno_max(); i++) {
296 struct obd_device *obd = class_num2obd(i);
297 if (obd && obd->obd_name &&
298 (strcmp(name, obd->obd_name) == 0)) {
299 CERROR("Device %s already exists, won't add\n", name);
301 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302 "%p obd_magic %08x != %08x\n", result,
303 result->obd_magic, OBD_DEVICE_MAGIC);
304 LASSERTF(result->obd_minor == new_obd_minor,
305 "%p obd_minor %d != %d\n", result,
306 result->obd_minor, new_obd_minor);
308 obd_devs[result->obd_minor] = NULL;
309 result->obd_name[0]='\0';
311 result = ERR_PTR(-EEXIST);
314 if (!result && !obd) {
316 result->obd_minor = i;
318 result->obd_type = type;
319 strncpy(result->obd_name, name,
320 sizeof(result->obd_name) - 1);
321 obd_devs[i] = result;
324 spin_unlock(&obd_dev_lock);
326 if (result == NULL && i >= class_devno_max()) {
327 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
329 result = ERR_PTR(-EOVERFLOW);
332 if (IS_ERR(result)) {
333 obd_device_free(newdev);
334 class_put_type(type);
336 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337 result->obd_name, result);
342 void class_release_dev(struct obd_device *obd)
344 struct obd_type *obd_type = obd->obd_type;
346 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350 LASSERT(obd_type != NULL);
352 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353 obd->obd_name,obd->obd_type->typ_name);
355 spin_lock(&obd_dev_lock);
356 obd_devs[obd->obd_minor] = NULL;
357 spin_unlock(&obd_dev_lock);
358 obd_device_free(obd);
360 class_put_type(obd_type);
363 int class_name2dev(const char *name)
370 spin_lock(&obd_dev_lock);
371 for (i = 0; i < class_devno_max(); i++) {
372 struct obd_device *obd = class_num2obd(i);
373 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374 /* Make sure we finished attaching before we give
375 out any references */
376 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377 if (obd->obd_attached) {
378 spin_unlock(&obd_dev_lock);
384 spin_unlock(&obd_dev_lock);
389 struct obd_device *class_name2obd(const char *name)
391 int dev = class_name2dev(name);
393 if (dev < 0 || dev > class_devno_max())
395 return class_num2obd(dev);
398 int class_uuid2dev(struct obd_uuid *uuid)
402 spin_lock(&obd_dev_lock);
403 for (i = 0; i < class_devno_max(); i++) {
404 struct obd_device *obd = class_num2obd(i);
405 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407 spin_unlock(&obd_dev_lock);
411 spin_unlock(&obd_dev_lock);
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
418 int dev = class_uuid2dev(uuid);
421 return class_num2obd(dev);
425 * Get obd device from ::obd_devs[]
427 * \param num [in] array index
429 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430 * otherwise return the obd device there.
432 struct obd_device *class_num2obd(int num)
434 struct obd_device *obd = NULL;
436 if (num < class_devno_max()) {
441 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442 "%p obd_magic %08x != %08x\n",
443 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444 LASSERTF(obd->obd_minor == num,
445 "%p obd_minor %0d != %0d\n",
446 obd, obd->obd_minor, num);
452 void class_obd_list(void)
457 spin_lock(&obd_dev_lock);
458 for (i = 0; i < class_devno_max(); i++) {
459 struct obd_device *obd = class_num2obd(i);
462 if (obd->obd_stopping)
464 else if (obd->obd_set_up)
466 else if (obd->obd_attached)
470 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471 i, status, obd->obd_type->typ_name,
472 obd->obd_name, obd->obd_uuid.uuid,
473 atomic_read(&obd->obd_refcount));
475 spin_unlock(&obd_dev_lock);
479 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
480 specified, then only the client with that uuid is returned,
481 otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483 const char * typ_name,
484 struct obd_uuid *grp_uuid)
488 spin_lock(&obd_dev_lock);
489 for (i = 0; i < class_devno_max(); i++) {
490 struct obd_device *obd = class_num2obd(i);
493 if ((strncmp(obd->obd_type->typ_name, typ_name,
494 strlen(typ_name)) == 0)) {
495 if (obd_uuid_equals(tgt_uuid,
496 &obd->u.cli.cl_target_uuid) &&
497 ((grp_uuid)? obd_uuid_equals(grp_uuid,
498 &obd->obd_uuid) : 1)) {
499 spin_unlock(&obd_dev_lock);
504 spin_unlock(&obd_dev_lock);
509 /* Iterate the obd_device list looking devices have grp_uuid. Start
510 searching at *next, and if a device is found, the next index to look
511 at is saved in *next. If next is NULL, then the first matching device
512 will always be returned. */
513 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
519 else if (*next >= 0 && *next < class_devno_max())
524 spin_lock(&obd_dev_lock);
525 for (; i < class_devno_max(); i++) {
526 struct obd_device *obd = class_num2obd(i);
529 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
532 spin_unlock(&obd_dev_lock);
536 spin_unlock(&obd_dev_lock);
542 void obd_cleanup_caches(void)
547 if (obd_device_cachep) {
548 rc = cfs_mem_cache_destroy(obd_device_cachep);
549 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
550 obd_device_cachep = NULL;
553 rc = cfs_mem_cache_destroy(obdo_cachep);
554 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
558 rc = cfs_mem_cache_destroy(import_cachep);
559 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
560 import_cachep = NULL;
563 rc = cfs_mem_cache_destroy(capa_cachep);
564 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
570 int obd_init_caches(void)
574 LASSERT(obd_device_cachep == NULL);
575 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
576 sizeof(struct obd_device),
578 if (!obd_device_cachep)
581 LASSERT(obdo_cachep == NULL);
582 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
587 LASSERT(import_cachep == NULL);
588 import_cachep = cfs_mem_cache_create("ll_import_cache",
589 sizeof(struct obd_import),
594 LASSERT(capa_cachep == NULL);
595 capa_cachep = cfs_mem_cache_create("capa_cache",
596 sizeof(struct obd_capa), 0, 0);
602 obd_cleanup_caches();
607 /* map connection to client */
608 struct obd_export *class_conn2export(struct lustre_handle *conn)
610 struct obd_export *export;
614 CDEBUG(D_CACHE, "looking for null handle\n");
618 if (conn->cookie == -1) { /* this means assign a new connection */
619 CDEBUG(D_CACHE, "want a new connection\n");
623 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
624 export = class_handle2object(conn->cookie);
628 struct obd_device *class_exp2obd(struct obd_export *exp)
635 struct obd_device *class_conn2obd(struct lustre_handle *conn)
637 struct obd_export *export;
638 export = class_conn2export(conn);
640 struct obd_device *obd = export->exp_obd;
641 class_export_put(export);
647 struct obd_import *class_exp2cliimp(struct obd_export *exp)
649 struct obd_device *obd = exp->exp_obd;
652 return obd->u.cli.cl_import;
655 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
657 struct obd_device *obd = class_conn2obd(conn);
660 return obd->u.cli.cl_import;
663 /* Export management functions */
664 static void export_handle_addref(void *export)
666 class_export_get(export);
669 struct obd_export *class_export_get(struct obd_export *exp)
671 atomic_inc(&exp->exp_refcount);
672 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
673 atomic_read(&exp->exp_refcount));
676 EXPORT_SYMBOL(class_export_get);
678 void class_export_put(struct obd_export *exp)
680 LASSERT(exp != NULL);
681 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
682 atomic_read(&exp->exp_refcount) - 1);
683 LASSERT(atomic_read(&exp->exp_refcount) > 0);
684 LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
686 if (atomic_dec_and_test(&exp->exp_refcount)) {
687 LASSERT (list_empty(&exp->exp_obd_chain));
689 CDEBUG(D_IOCTL, "final put %p/%s\n",
690 exp, exp->exp_client_uuid.uuid);
692 spin_lock(&obd_zombie_impexp_lock);
693 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
694 spin_unlock(&obd_zombie_impexp_lock);
696 if (obd_zombie_impexp_notify != NULL)
697 obd_zombie_impexp_notify();
700 EXPORT_SYMBOL(class_export_put);
702 static void class_export_destroy(struct obd_export *exp)
704 struct obd_device *obd = exp->exp_obd;
707 LASSERT (atomic_read(&exp->exp_refcount) == 0);
709 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
710 exp->exp_client_uuid.uuid, obd->obd_name);
712 LASSERT(obd != NULL);
714 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
715 if (exp->exp_connection)
716 ptlrpc_put_connection_superhack(exp->exp_connection);
718 LASSERT(list_empty(&exp->exp_outstanding_replies));
719 LASSERT(list_empty(&exp->exp_req_replay_queue));
720 LASSERT(list_empty(&exp->exp_queued_rpc));
721 obd_destroy_export(exp);
722 class_decref(obd, "export", exp);
724 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
728 /* Creates a new export, adds it to the hash table, and returns a
729 * pointer to it. The refcount is 2: one for the hash reference, and
730 * one for the pointer returned by this function. */
731 struct obd_export *class_new_export(struct obd_device *obd,
732 struct obd_uuid *cluuid)
734 struct obd_export *export;
737 OBD_ALLOC_PTR(export);
739 return ERR_PTR(-ENOMEM);
741 export->exp_conn_cnt = 0;
742 export->exp_lock_hash = NULL;
743 atomic_set(&export->exp_refcount, 2);
744 atomic_set(&export->exp_rpc_count, 0);
745 export->exp_obd = obd;
746 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
747 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
748 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
749 CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
750 class_handle_hash(&export->exp_handle, export_handle_addref);
751 export->exp_last_request_time = cfs_time_current_sec();
752 spin_lock_init(&export->exp_lock);
753 INIT_HLIST_NODE(&export->exp_uuid_hash);
754 INIT_HLIST_NODE(&export->exp_nid_hash);
756 export->exp_sp_peer = LUSTRE_SP_ANY;
757 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
758 export->exp_client_uuid = *cluuid;
759 obd_init_export(export);
761 spin_lock(&obd->obd_dev_lock);
762 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
763 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
764 &export->exp_uuid_hash);
766 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
767 obd->obd_name, cluuid->uuid, rc);
768 spin_unlock(&obd->obd_dev_lock);
769 class_handle_unhash(&export->exp_handle);
770 OBD_FREE_PTR(export);
771 return ERR_PTR(-EALREADY);
775 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
776 class_incref(obd, "export", export);
777 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
778 list_add_tail(&export->exp_obd_chain_timed,
779 &export->exp_obd->obd_exports_timed);
780 export->exp_obd->obd_num_exports++;
781 spin_unlock(&obd->obd_dev_lock);
785 EXPORT_SYMBOL(class_new_export);
787 void class_unlink_export(struct obd_export *exp)
789 class_handle_unhash(&exp->exp_handle);
791 spin_lock(&exp->exp_obd->obd_dev_lock);
792 /* delete an uuid-export hashitem from hashtables */
793 if (!hlist_unhashed(&exp->exp_uuid_hash))
794 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
795 &exp->exp_client_uuid,
796 &exp->exp_uuid_hash);
798 list_del_init(&exp->exp_obd_chain);
799 list_del_init(&exp->exp_obd_chain_timed);
800 exp->exp_obd->obd_num_exports--;
801 spin_unlock(&exp->exp_obd->obd_dev_lock);
803 class_export_put(exp);
805 EXPORT_SYMBOL(class_unlink_export);
807 /* Import management functions */
808 static void import_handle_addref(void *import)
810 class_import_get(import);
813 struct obd_import *class_import_get(struct obd_import *import)
815 LASSERT(atomic_read(&import->imp_refcount) >= 0);
816 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
817 atomic_inc(&import->imp_refcount);
818 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
819 atomic_read(&import->imp_refcount),
820 import->imp_obd->obd_name);
823 EXPORT_SYMBOL(class_import_get);
825 void class_import_put(struct obd_import *import)
829 LASSERT(atomic_read(&import->imp_refcount) > 0);
830 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
831 LASSERT(list_empty(&import->imp_zombie_chain));
833 CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
834 atomic_read(&import->imp_refcount) - 1,
835 import->imp_obd->obd_name);
837 if (atomic_dec_and_test(&import->imp_refcount)) {
838 CDEBUG(D_INFO, "final put import %p\n", import);
839 spin_lock(&obd_zombie_impexp_lock);
840 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
841 spin_unlock(&obd_zombie_impexp_lock);
843 if (obd_zombie_impexp_notify != NULL)
844 obd_zombie_impexp_notify();
849 EXPORT_SYMBOL(class_import_put);
851 void class_import_destroy(struct obd_import *import)
855 CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
856 import->imp_obd->obd_name);
858 LASSERT(atomic_read(&import->imp_refcount) == 0);
860 ptlrpc_put_connection_superhack(import->imp_connection);
862 while (!list_empty(&import->imp_conn_list)) {
863 struct obd_import_conn *imp_conn;
865 imp_conn = list_entry(import->imp_conn_list.next,
866 struct obd_import_conn, oic_item);
867 list_del(&imp_conn->oic_item);
868 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
869 OBD_FREE(imp_conn, sizeof(*imp_conn));
872 LASSERT(import->imp_sec == NULL);
873 class_decref(import->imp_obd, "import", import);
874 OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
878 static void init_imp_at(struct imp_at *at) {
880 at_init(&at->iat_net_latency, 0, 0);
881 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
882 /* max service estimates are tracked on the server side, so
883 don't use the AT history here, just use the last reported
884 val. (But keep hist for proc histogram, worst_ever) */
885 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
890 struct obd_import *class_new_import(struct obd_device *obd)
892 struct obd_import *imp;
894 OBD_ALLOC(imp, sizeof(*imp));
898 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
899 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
900 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
901 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
902 spin_lock_init(&imp->imp_lock);
903 imp->imp_last_success_conn = 0;
904 imp->imp_state = LUSTRE_IMP_NEW;
905 imp->imp_obd = class_incref(obd, "import", imp);
906 sema_init(&imp->imp_sec_mutex, 1);
907 cfs_waitq_init(&imp->imp_recovery_waitq);
909 atomic_set(&imp->imp_refcount, 2);
910 atomic_set(&imp->imp_unregistering, 0);
911 atomic_set(&imp->imp_inflight, 0);
912 atomic_set(&imp->imp_replay_inflight, 0);
913 atomic_set(&imp->imp_inval_count, 0);
914 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
915 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
916 class_handle_hash(&imp->imp_handle, import_handle_addref);
917 init_imp_at(&imp->imp_at);
919 /* the default magic is V2, will be used in connect RPC, and
920 * then adjusted according to the flags in request/reply. */
921 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
925 EXPORT_SYMBOL(class_new_import);
927 void class_destroy_import(struct obd_import *import)
929 LASSERT(import != NULL);
930 LASSERT(import != LP_POISON);
932 class_handle_unhash(&import->imp_handle);
934 spin_lock(&import->imp_lock);
935 import->imp_generation++;
936 spin_unlock(&import->imp_lock);
937 class_import_put(import);
939 EXPORT_SYMBOL(class_destroy_import);
941 /* A connection defines an export context in which preallocation can
942 be managed. This releases the export pointer reference, and returns
943 the export handle, so the export refcount is 1 when this function
945 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
946 struct obd_uuid *cluuid)
948 struct obd_export *export;
949 LASSERT(conn != NULL);
950 LASSERT(obd != NULL);
951 LASSERT(cluuid != NULL);
954 export = class_new_export(obd, cluuid);
956 RETURN(PTR_ERR(export));
958 conn->cookie = export->exp_handle.h_cookie;
959 class_export_put(export);
961 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
962 cluuid->uuid, conn->cookie);
965 EXPORT_SYMBOL(class_connect);
967 /* if export is involved in recovery then clean up related things */
968 void class_export_recovery_cleanup(struct obd_export *exp)
970 struct obd_device *obd = exp->exp_obd;
972 spin_lock_bh(&obd->obd_processing_task_lock);
973 if (obd->obd_recovering && exp->exp_in_recovery) {
974 spin_lock(&exp->exp_lock);
975 exp->exp_in_recovery = 0;
976 spin_unlock(&exp->exp_lock);
977 obd->obd_connected_clients--;
978 /* each connected client is counted as recoverable */
979 obd->obd_recoverable_clients--;
980 if (exp->exp_req_replay_needed) {
981 spin_lock(&exp->exp_lock);
982 exp->exp_req_replay_needed = 0;
983 spin_unlock(&exp->exp_lock);
984 LASSERT(atomic_read(&obd->obd_req_replay_clients));
985 atomic_dec(&obd->obd_req_replay_clients);
987 if (exp->exp_lock_replay_needed) {
988 spin_lock(&exp->exp_lock);
989 exp->exp_lock_replay_needed = 0;
990 spin_unlock(&exp->exp_lock);
991 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
992 atomic_dec(&obd->obd_lock_replay_clients);
995 spin_unlock_bh(&obd->obd_processing_task_lock);
998 /* This function removes two references from the export: one for the
999 * hash entry and one for the export pointer passed in. The export
1000 * pointer passed to this function is destroyed should not be used
1002 int class_disconnect(struct obd_export *export)
1004 int already_disconnected;
1007 if (export == NULL) {
1009 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1013 spin_lock(&export->exp_lock);
1014 already_disconnected = export->exp_disconnected;
1015 export->exp_disconnected = 1;
1017 if (!hlist_unhashed(&export->exp_nid_hash))
1018 lustre_hash_del(export->exp_obd->obd_nid_hash,
1019 &export->exp_connection->c_peer.nid,
1020 &export->exp_nid_hash);
1022 spin_unlock(&export->exp_lock);
1024 /* class_cleanup(), abort_recovery(), and class_fail_export()
1025 * all end up in here, and if any of them race we shouldn't
1026 * call extra class_export_puts(). */
1027 if (already_disconnected)
1030 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1031 export->exp_handle.h_cookie);
1033 class_export_recovery_cleanup(export);
1034 class_unlink_export(export);
1035 class_export_put(export);
1039 static void class_disconnect_export_list(struct list_head *list, int flags)
1042 struct lustre_handle fake_conn;
1043 struct obd_export *fake_exp, *exp;
1046 /* It's possible that an export may disconnect itself, but
1047 * nothing else will be added to this list. */
1048 while (!list_empty(list)) {
1049 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1050 class_export_get(exp);
1052 spin_lock(&exp->exp_lock);
1053 exp->exp_flags = flags;
1054 spin_unlock(&exp->exp_lock);
1056 if (obd_uuid_equals(&exp->exp_client_uuid,
1057 &exp->exp_obd->obd_uuid)) {
1059 "exp %p export uuid == obd uuid, don't discon\n",
1061 /* Need to delete this now so we don't end up pointing
1062 * to work_list later when this export is cleaned up. */
1063 list_del_init(&exp->exp_obd_chain);
1064 class_export_put(exp);
1068 fake_conn.cookie = exp->exp_handle.h_cookie;
1069 fake_exp = class_conn2export(&fake_conn);
1071 class_export_put(exp);
1075 spin_lock(&fake_exp->exp_lock);
1076 fake_exp->exp_flags = flags;
1077 spin_unlock(&fake_exp->exp_lock);
1079 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1080 "last request at "CFS_TIME_T"\n",
1081 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1082 exp, exp->exp_last_request_time);
1083 rc = obd_disconnect(fake_exp);
1084 class_export_put(exp);
1089 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1091 return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1092 (obd->obd_force ? OBD_OPT_FORCE : 0));
1095 void class_disconnect_exports(struct obd_device *obd)
1097 struct list_head work_list;
1100 /* Move all of the exports from obd_exports to a work list, en masse. */
1101 spin_lock(&obd->obd_dev_lock);
1102 list_add(&work_list, &obd->obd_exports);
1103 list_del_init(&obd->obd_exports);
1104 spin_unlock(&obd->obd_dev_lock);
1106 if (!list_empty(&work_list)) {
1107 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1108 "disconnecting them\n", obd->obd_minor, obd);
1109 class_disconnect_export_list(&work_list,
1110 get_exp_flags_from_obd(obd));
1112 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1113 obd->obd_minor, obd);
1116 EXPORT_SYMBOL(class_disconnect_exports);
1118 /* Remove exports that have not completed recovery.
1120 int class_disconnect_stale_exports(struct obd_device *obd,
1121 int (*test_export)(struct obd_export *))
1123 struct list_head work_list;
1124 struct list_head *pos, *n;
1125 struct obd_export *exp;
1129 CFS_INIT_LIST_HEAD(&work_list);
1130 spin_lock(&obd->obd_dev_lock);
1131 list_for_each_safe(pos, n, &obd->obd_exports) {
1132 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1133 if (test_export(exp))
1136 list_del(&exp->exp_obd_chain);
1137 list_add(&exp->exp_obd_chain, &work_list);
1138 /* don't count self-export as client */
1139 if (obd_uuid_equals(&exp->exp_client_uuid,
1140 &exp->exp_obd->obd_uuid))
1144 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1145 obd->obd_name, exp->exp_client_uuid.uuid,
1146 exp->exp_connection == NULL ? "<unknown>" :
1147 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1149 spin_unlock(&obd->obd_dev_lock);
1151 CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1152 obd->obd_name, cnt);
1153 class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1156 EXPORT_SYMBOL(class_disconnect_stale_exports);
1158 void class_fail_export(struct obd_export *exp)
1160 int rc, already_failed;
1162 spin_lock(&exp->exp_lock);
1163 already_failed = exp->exp_failed;
1164 exp->exp_failed = 1;
1165 spin_unlock(&exp->exp_lock);
1167 if (already_failed) {
1168 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1169 exp, exp->exp_client_uuid.uuid);
1173 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1174 exp, exp->exp_client_uuid.uuid);
1176 if (obd_dump_on_timeout)
1177 libcfs_debug_dumplog();
1179 /* Most callers into obd_disconnect are removing their own reference
1180 * (request, for example) in addition to the one from the hash table.
1181 * We don't have such a reference here, so make one. */
1182 class_export_get(exp);
1183 rc = obd_disconnect(exp);
1185 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1187 CDEBUG(D_HA, "disconnected export %p/%s\n",
1188 exp, exp->exp_client_uuid.uuid);
1190 EXPORT_SYMBOL(class_fail_export);
1192 char *obd_export_nid2str(struct obd_export *exp)
1194 if (exp->exp_connection != NULL)
1195 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1199 EXPORT_SYMBOL(obd_export_nid2str);
1201 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1203 struct obd_export *doomed_exp = NULL;
1204 int exports_evicted = 0;
1206 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1209 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1210 if (doomed_exp == NULL)
1213 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1214 "nid %s found, wanted nid %s, requested nid %s\n",
1215 obd_export_nid2str(doomed_exp),
1216 libcfs_nid2str(nid_key), nid);
1217 LASSERTF(doomed_exp != obd->obd_self_export,
1218 "self-export is hashed by NID?\n");
1220 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1221 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1223 class_fail_export(doomed_exp);
1224 class_export_put(doomed_exp);
1227 if (!exports_evicted)
1228 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1229 obd->obd_name, nid);
1230 return exports_evicted;
1232 EXPORT_SYMBOL(obd_export_evict_by_nid);
1234 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1236 struct obd_export *doomed_exp = NULL;
1237 struct obd_uuid doomed_uuid;
1238 int exports_evicted = 0;
1240 obd_str2uuid(&doomed_uuid, uuid);
1241 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1242 CERROR("%s: can't evict myself\n", obd->obd_name);
1243 return exports_evicted;
1246 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1248 if (doomed_exp == NULL) {
1249 CERROR("%s: can't disconnect %s: no exports found\n",
1250 obd->obd_name, uuid);
1252 CWARN("%s: evicting %s at adminstrative request\n",
1253 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1254 class_fail_export(doomed_exp);
1255 class_export_put(doomed_exp);
1259 return exports_evicted;
1261 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1264 * kill zombie imports and exports
1266 void obd_zombie_impexp_cull(void)
1268 struct obd_import *import;
1269 struct obd_export *export;
1273 spin_lock (&obd_zombie_impexp_lock);
1276 if (!list_empty(&obd_zombie_imports)) {
1277 import = list_entry(obd_zombie_imports.next,
1280 list_del(&import->imp_zombie_chain);
1284 if (!list_empty(&obd_zombie_exports)) {
1285 export = list_entry(obd_zombie_exports.next,
1288 list_del_init(&export->exp_obd_chain);
1291 spin_unlock(&obd_zombie_impexp_lock);
1294 class_import_destroy(import);
1297 class_export_destroy(export);
1299 } while (import != NULL || export != NULL);
1303 static struct completion obd_zombie_start;
1304 static struct completion obd_zombie_stop;
1305 static unsigned long obd_zombie_flags;
1306 static cfs_waitq_t obd_zombie_waitq;
1313 * check for work for kill zombie import/export thread.
1315 static int obd_zombie_impexp_check(void *arg)
1319 spin_lock(&obd_zombie_impexp_lock);
1320 rc = list_empty(&obd_zombie_imports) &&
1321 list_empty(&obd_zombie_exports) &&
1322 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1324 spin_unlock(&obd_zombie_impexp_lock);
1330 * notify import/export destroy thread about new zombie.
1332 static void obd_zombie_impexp_notify(void)
1334 cfs_waitq_signal(&obd_zombie_waitq);
1338 * check whether obd_zombie is idle
1340 static int obd_zombie_is_idle(void)
1344 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1345 spin_lock(&obd_zombie_impexp_lock);
1346 rc = list_empty(&obd_zombie_imports) &&
1347 list_empty(&obd_zombie_exports);
1348 spin_unlock(&obd_zombie_impexp_lock);
1353 * wait when obd_zombie import/export queues become empty
1355 void obd_zombie_barrier(void)
1357 struct l_wait_info lwi = { 0 };
1359 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1361 EXPORT_SYMBOL(obd_zombie_barrier);
1366 * destroy zombie export/import thread.
1368 static int obd_zombie_impexp_thread(void *unused)
1372 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1373 complete(&obd_zombie_start);
1377 complete(&obd_zombie_start);
1379 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1380 struct l_wait_info lwi = { 0 };
1382 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1384 obd_zombie_impexp_cull();
1385 /* Notify obd_zombie_barrier callers that queues may be empty */
1386 cfs_waitq_signal(&obd_zombie_waitq);
1389 complete(&obd_zombie_stop);
1394 #else /* ! KERNEL */
1396 static atomic_t zombie_recur = ATOMIC_INIT(0);
1397 static void *obd_zombie_impexp_work_cb;
1398 static void *obd_zombie_impexp_idle_cb;
1400 int obd_zombie_impexp_kill(void *arg)
1404 if (atomic_inc_return(&zombie_recur) == 1) {
1405 obd_zombie_impexp_cull();
1408 atomic_dec(&zombie_recur);
1415 * start destroy zombie import/export thread
1417 int obd_zombie_impexp_init(void)
1421 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1422 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1423 spin_lock_init(&obd_zombie_impexp_lock);
1424 init_completion(&obd_zombie_start);
1425 init_completion(&obd_zombie_stop);
1426 cfs_waitq_init(&obd_zombie_waitq);
1429 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1433 wait_for_completion(&obd_zombie_start);
1436 obd_zombie_impexp_work_cb =
1437 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1438 &obd_zombie_impexp_kill, NULL);
1440 obd_zombie_impexp_idle_cb =
1441 liblustre_register_idle_callback("obd_zombi_impexp_check",
1442 &obd_zombie_impexp_check, NULL);
1449 * stop destroy zombie import/export thread
1451 void obd_zombie_impexp_stop(void)
1453 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1454 obd_zombie_impexp_notify();
1456 wait_for_completion(&obd_zombie_stop);
1458 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1459 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);