1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
70 static struct obd_device *obd_device_alloc(void)
72 struct obd_device *obd;
74 OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
76 obd->obd_magic = OBD_DEVICE_MAGIC;
80 EXPORT_SYMBOL(obd_device_alloc);
82 static void obd_device_free(struct obd_device *obd)
85 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87 if (obd->obd_namespace != NULL) {
88 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89 obd, obd->obd_namespace, obd->obd_force);
92 lu_ref_fini(&obd->obd_reference);
93 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 struct obd_type *class_search_type(const char *name)
98 struct list_head *tmp;
99 struct obd_type *type;
101 spin_lock(&obd_types_lock);
102 list_for_each(tmp, &obd_types) {
103 type = list_entry(tmp, struct obd_type, typ_chain);
104 if (strcmp(type->typ_name, name) == 0) {
105 spin_unlock(&obd_types_lock);
109 spin_unlock(&obd_types_lock);
113 struct obd_type *class_get_type(const char *name)
115 struct obd_type *type = class_search_type(name);
119 const char *modname = name;
120 if (!request_module(modname)) {
121 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122 type = class_search_type(name);
124 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130 spin_lock(&type->obd_type_lock);
132 try_module_get(type->typ_dt_ops->o_owner);
133 spin_unlock(&type->obd_type_lock);
138 void class_put_type(struct obd_type *type)
141 spin_lock(&type->obd_type_lock);
143 module_put(type->typ_dt_ops->o_owner);
144 spin_unlock(&type->obd_type_lock);
147 #define CLASS_MAX_NAME 1024
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150 struct lprocfs_vars *vars, const char *name,
151 struct lu_device_type *ldt)
153 struct obd_type *type;
158 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
160 if (class_search_type(name)) {
161 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166 OBD_ALLOC(type, sizeof(*type));
170 OBD_ALLOC_PTR(type->typ_dt_ops);
171 OBD_ALLOC_PTR(type->typ_md_ops);
172 OBD_ALLOC(type->typ_name, strlen(name) + 1);
174 if (type->typ_dt_ops == NULL ||
175 type->typ_md_ops == NULL ||
176 type->typ_name == NULL)
179 *(type->typ_dt_ops) = *dt_ops;
180 /* md_ops is optional */
182 *(type->typ_md_ops) = *md_ops;
183 strcpy(type->typ_name, name);
184 spin_lock_init(&type->obd_type_lock);
187 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
189 if (IS_ERR(type->typ_procroot)) {
190 rc = PTR_ERR(type->typ_procroot);
191 type->typ_procroot = NULL;
197 rc = lu_device_type_init(ldt);
202 spin_lock(&obd_types_lock);
203 list_add(&type->typ_chain, &obd_types);
204 spin_unlock(&obd_types_lock);
209 if (type->typ_name != NULL)
210 OBD_FREE(type->typ_name, strlen(name) + 1);
211 if (type->typ_md_ops != NULL)
212 OBD_FREE_PTR(type->typ_md_ops);
213 if (type->typ_dt_ops != NULL)
214 OBD_FREE_PTR(type->typ_dt_ops);
215 OBD_FREE(type, sizeof(*type));
219 int class_unregister_type(const char *name)
221 struct obd_type *type = class_search_type(name);
225 CERROR("unknown obd type\n");
229 if (type->typ_refcnt) {
230 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231 /* This is a bad situation, let's make the best of it */
232 /* Remove ops, but leave the name for debugging */
233 OBD_FREE_PTR(type->typ_dt_ops);
234 OBD_FREE_PTR(type->typ_md_ops);
238 if (type->typ_procroot) {
239 lprocfs_remove(&type->typ_procroot);
243 lu_device_type_fini(type->typ_lu);
245 spin_lock(&obd_types_lock);
246 list_del(&type->typ_chain);
247 spin_unlock(&obd_types_lock);
248 OBD_FREE(type->typ_name, strlen(name) + 1);
249 if (type->typ_dt_ops != NULL)
250 OBD_FREE_PTR(type->typ_dt_ops);
251 if (type->typ_md_ops != NULL)
252 OBD_FREE_PTR(type->typ_md_ops);
253 OBD_FREE(type, sizeof(*type));
255 } /* class_unregister_type */
258 * Create a new obd device.
260 * Find an empty slot in ::obd_devs[], create a new obd device in it.
262 * \param typename [in] obd device type string.
263 * \param name [in] obd device name.
265 * \retval NULL if create fails, otherwise return the obd device
268 struct obd_device *class_newdev(const char *type_name, const char *name)
270 struct obd_device *result = NULL;
271 struct obd_device *newdev;
272 struct obd_type *type = NULL;
274 int new_obd_minor = 0;
276 if (strlen(name) >= MAX_OBD_NAME) {
277 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278 RETURN(ERR_PTR(-EINVAL));
281 type = class_get_type(type_name);
283 CERROR("OBD: unknown type: %s\n", type_name);
284 RETURN(ERR_PTR(-ENODEV));
287 newdev = obd_device_alloc();
288 if (newdev == NULL) {
289 class_put_type(type);
290 RETURN(ERR_PTR(-ENOMEM));
292 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
294 spin_lock(&obd_dev_lock);
295 for (i = 0; i < class_devno_max(); i++) {
296 struct obd_device *obd = class_num2obd(i);
297 if (obd && obd->obd_name &&
298 (strcmp(name, obd->obd_name) == 0)) {
299 CERROR("Device %s already exists, won't add\n", name);
301 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302 "%p obd_magic %08x != %08x\n", result,
303 result->obd_magic, OBD_DEVICE_MAGIC);
304 LASSERTF(result->obd_minor == new_obd_minor,
305 "%p obd_minor %d != %d\n", result,
306 result->obd_minor, new_obd_minor);
308 obd_devs[result->obd_minor] = NULL;
309 result->obd_name[0]='\0';
311 result = ERR_PTR(-EEXIST);
314 if (!result && !obd) {
316 result->obd_minor = i;
318 result->obd_type = type;
319 strncpy(result->obd_name, name,
320 sizeof(result->obd_name) - 1);
321 obd_devs[i] = result;
324 spin_unlock(&obd_dev_lock);
326 if (result == NULL && i >= class_devno_max()) {
327 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
329 result = ERR_PTR(-EOVERFLOW);
332 if (IS_ERR(result)) {
333 obd_device_free(newdev);
334 class_put_type(type);
336 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337 result->obd_name, result);
342 void class_release_dev(struct obd_device *obd)
344 struct obd_type *obd_type = obd->obd_type;
346 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350 LASSERT(obd_type != NULL);
352 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353 obd->obd_name,obd->obd_type->typ_name);
355 spin_lock(&obd_dev_lock);
356 obd_devs[obd->obd_minor] = NULL;
357 spin_unlock(&obd_dev_lock);
358 obd_device_free(obd);
360 class_put_type(obd_type);
363 int class_name2dev(const char *name)
370 spin_lock(&obd_dev_lock);
371 for (i = 0; i < class_devno_max(); i++) {
372 struct obd_device *obd = class_num2obd(i);
373 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374 /* Make sure we finished attaching before we give
375 out any references */
376 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377 if (obd->obd_attached) {
378 spin_unlock(&obd_dev_lock);
384 spin_unlock(&obd_dev_lock);
389 struct obd_device *class_name2obd(const char *name)
391 int dev = class_name2dev(name);
393 if (dev < 0 || dev > class_devno_max())
395 return class_num2obd(dev);
398 int class_uuid2dev(struct obd_uuid *uuid)
402 spin_lock(&obd_dev_lock);
403 for (i = 0; i < class_devno_max(); i++) {
404 struct obd_device *obd = class_num2obd(i);
405 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407 spin_unlock(&obd_dev_lock);
411 spin_unlock(&obd_dev_lock);
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
418 int dev = class_uuid2dev(uuid);
421 return class_num2obd(dev);
425 * Get obd device from ::obd_devs[]
427 * \param num [in] array index
429 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430 * otherwise return the obd device there.
432 struct obd_device *class_num2obd(int num)
434 struct obd_device *obd = NULL;
436 if (num < class_devno_max()) {
441 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442 "%p obd_magic %08x != %08x\n",
443 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444 LASSERTF(obd->obd_minor == num,
445 "%p obd_minor %0d != %0d\n",
446 obd, obd->obd_minor, num);
452 void class_obd_list(void)
457 spin_lock(&obd_dev_lock);
458 for (i = 0; i < class_devno_max(); i++) {
459 struct obd_device *obd = class_num2obd(i);
462 if (obd->obd_stopping)
464 else if (obd->obd_set_up)
466 else if (obd->obd_attached)
470 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471 i, status, obd->obd_type->typ_name,
472 obd->obd_name, obd->obd_uuid.uuid,
473 atomic_read(&obd->obd_refcount));
475 spin_unlock(&obd_dev_lock);
479 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
480 specified, then only the client with that uuid is returned,
481 otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483 const char * typ_name,
484 struct obd_uuid *grp_uuid)
488 spin_lock(&obd_dev_lock);
489 for (i = 0; i < class_devno_max(); i++) {
490 struct obd_device *obd = class_num2obd(i);
493 if ((strncmp(obd->obd_type->typ_name, typ_name,
494 strlen(typ_name)) == 0)) {
495 if (obd_uuid_equals(tgt_uuid,
496 &obd->u.cli.cl_target_uuid) &&
497 ((grp_uuid)? obd_uuid_equals(grp_uuid,
498 &obd->obd_uuid) : 1)) {
499 spin_unlock(&obd_dev_lock);
504 spin_unlock(&obd_dev_lock);
509 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
510 struct obd_uuid *grp_uuid)
512 struct obd_device *obd;
514 obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
516 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
521 /* Iterate the obd_device list looking devices have grp_uuid. Start
522 searching at *next, and if a device is found, the next index to look
523 at is saved in *next. If next is NULL, then the first matching device
524 will always be returned. */
525 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
531 else if (*next >= 0 && *next < class_devno_max())
536 spin_lock(&obd_dev_lock);
537 for (; i < class_devno_max(); i++) {
538 struct obd_device *obd = class_num2obd(i);
541 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
544 spin_unlock(&obd_dev_lock);
548 spin_unlock(&obd_dev_lock);
554 void obd_cleanup_caches(void)
559 if (obd_device_cachep) {
560 rc = cfs_mem_cache_destroy(obd_device_cachep);
561 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
562 obd_device_cachep = NULL;
565 rc = cfs_mem_cache_destroy(obdo_cachep);
566 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
570 rc = cfs_mem_cache_destroy(import_cachep);
571 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
572 import_cachep = NULL;
575 rc = cfs_mem_cache_destroy(capa_cachep);
576 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
582 int obd_init_caches(void)
586 LASSERT(obd_device_cachep == NULL);
587 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
588 sizeof(struct obd_device),
590 if (!obd_device_cachep)
593 LASSERT(obdo_cachep == NULL);
594 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
599 LASSERT(import_cachep == NULL);
600 import_cachep = cfs_mem_cache_create("ll_import_cache",
601 sizeof(struct obd_import),
606 LASSERT(capa_cachep == NULL);
607 capa_cachep = cfs_mem_cache_create("capa_cache",
608 sizeof(struct obd_capa), 0, 0);
614 obd_cleanup_caches();
619 /* map connection to client */
620 struct obd_export *class_conn2export(struct lustre_handle *conn)
622 struct obd_export *export;
626 CDEBUG(D_CACHE, "looking for null handle\n");
630 if (conn->cookie == -1) { /* this means assign a new connection */
631 CDEBUG(D_CACHE, "want a new connection\n");
635 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
636 export = class_handle2object(conn->cookie);
640 struct obd_device *class_exp2obd(struct obd_export *exp)
647 struct obd_device *class_conn2obd(struct lustre_handle *conn)
649 struct obd_export *export;
650 export = class_conn2export(conn);
652 struct obd_device *obd = export->exp_obd;
653 class_export_put(export);
659 struct obd_import *class_exp2cliimp(struct obd_export *exp)
661 struct obd_device *obd = exp->exp_obd;
664 return obd->u.cli.cl_import;
667 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
669 struct obd_device *obd = class_conn2obd(conn);
672 return obd->u.cli.cl_import;
675 /* Export management functions */
676 static void export_handle_addref(void *export)
678 class_export_get(export);
681 struct obd_export *class_export_get(struct obd_export *exp)
683 atomic_inc(&exp->exp_refcount);
684 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
685 atomic_read(&exp->exp_refcount));
688 EXPORT_SYMBOL(class_export_get);
690 void class_export_put(struct obd_export *exp)
692 LASSERT(exp != NULL);
693 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
694 atomic_read(&exp->exp_refcount) - 1);
695 LASSERT(atomic_read(&exp->exp_refcount) > 0);
696 LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
698 if (atomic_dec_and_test(&exp->exp_refcount)) {
699 LASSERT (list_empty(&exp->exp_obd_chain));
701 CDEBUG(D_IOCTL, "final put %p/%s\n",
702 exp, exp->exp_client_uuid.uuid);
704 spin_lock(&obd_zombie_impexp_lock);
705 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
706 spin_unlock(&obd_zombie_impexp_lock);
708 if (obd_zombie_impexp_notify != NULL)
709 obd_zombie_impexp_notify();
712 EXPORT_SYMBOL(class_export_put);
714 static void class_export_destroy(struct obd_export *exp)
716 struct obd_device *obd = exp->exp_obd;
719 LASSERT (atomic_read(&exp->exp_refcount) == 0);
721 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
722 exp->exp_client_uuid.uuid, obd->obd_name);
724 LASSERT(obd != NULL);
726 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
727 if (exp->exp_connection)
728 ptlrpc_put_connection_superhack(exp->exp_connection);
730 LASSERT(list_empty(&exp->exp_outstanding_replies));
731 LASSERT(list_empty(&exp->exp_req_replay_queue));
732 obd_destroy_export(exp);
733 class_decref(obd, "export", exp);
735 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
739 /* Creates a new export, adds it to the hash table, and returns a
740 * pointer to it. The refcount is 2: one for the hash reference, and
741 * one for the pointer returned by this function. */
742 struct obd_export *class_new_export(struct obd_device *obd,
743 struct obd_uuid *cluuid)
745 struct obd_export *export;
748 OBD_ALLOC_PTR(export);
750 return ERR_PTR(-ENOMEM);
752 export->exp_conn_cnt = 0;
753 export->exp_lock_hash = NULL;
754 atomic_set(&export->exp_refcount, 2);
755 atomic_set(&export->exp_rpc_count, 0);
756 export->exp_obd = obd;
757 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
758 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
759 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
760 class_handle_hash(&export->exp_handle, export_handle_addref);
761 export->exp_last_request_time = cfs_time_current_sec();
762 spin_lock_init(&export->exp_lock);
763 INIT_HLIST_NODE(&export->exp_uuid_hash);
764 INIT_HLIST_NODE(&export->exp_nid_hash);
766 export->exp_sp_peer = LUSTRE_SP_ANY;
767 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
768 export->exp_client_uuid = *cluuid;
769 obd_init_export(export);
771 spin_lock(&obd->obd_dev_lock);
772 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
773 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
774 &export->exp_uuid_hash);
776 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
777 obd->obd_name, cluuid->uuid, rc);
778 spin_unlock(&obd->obd_dev_lock);
779 class_handle_unhash(&export->exp_handle);
780 OBD_FREE_PTR(export);
781 return ERR_PTR(-EALREADY);
785 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
786 class_incref(obd, "export", export);
787 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
788 list_add_tail(&export->exp_obd_chain_timed,
789 &export->exp_obd->obd_exports_timed);
790 export->exp_obd->obd_num_exports++;
791 spin_unlock(&obd->obd_dev_lock);
795 EXPORT_SYMBOL(class_new_export);
797 void class_unlink_export(struct obd_export *exp)
799 class_handle_unhash(&exp->exp_handle);
801 spin_lock(&exp->exp_obd->obd_dev_lock);
802 /* delete an uuid-export hashitem from hashtables */
803 if (!hlist_unhashed(&exp->exp_uuid_hash))
804 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
805 &exp->exp_client_uuid,
806 &exp->exp_uuid_hash);
808 list_del_init(&exp->exp_obd_chain);
809 list_del_init(&exp->exp_obd_chain_timed);
810 exp->exp_obd->obd_num_exports--;
811 spin_unlock(&exp->exp_obd->obd_dev_lock);
813 class_export_put(exp);
815 EXPORT_SYMBOL(class_unlink_export);
817 /* Import management functions */
818 static void import_handle_addref(void *import)
820 class_import_get(import);
823 struct obd_import *class_import_get(struct obd_import *import)
825 LASSERT(atomic_read(&import->imp_refcount) >= 0);
826 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
827 atomic_inc(&import->imp_refcount);
828 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
829 atomic_read(&import->imp_refcount));
832 EXPORT_SYMBOL(class_import_get);
834 void class_import_put(struct obd_import *import)
838 LASSERT(atomic_read(&import->imp_refcount) > 0);
839 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
840 LASSERT(list_empty(&import->imp_zombie_chain));
842 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
843 atomic_read(&import->imp_refcount) - 1);
845 if (atomic_dec_and_test(&import->imp_refcount)) {
847 CDEBUG(D_INFO, "final put import %p\n", import);
849 spin_lock(&obd_zombie_impexp_lock);
850 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
851 spin_unlock(&obd_zombie_impexp_lock);
853 if (obd_zombie_impexp_notify != NULL)
854 obd_zombie_impexp_notify();
859 EXPORT_SYMBOL(class_import_put);
861 void class_import_destroy(struct obd_import *import)
865 CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
866 import->imp_obd->obd_name);
868 LASSERT(atomic_read(&import->imp_refcount) == 0);
870 ptlrpc_put_connection_superhack(import->imp_connection);
872 while (!list_empty(&import->imp_conn_list)) {
873 struct obd_import_conn *imp_conn;
875 imp_conn = list_entry(import->imp_conn_list.next,
876 struct obd_import_conn, oic_item);
877 list_del(&imp_conn->oic_item);
878 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
879 OBD_FREE(imp_conn, sizeof(*imp_conn));
882 LASSERT(import->imp_sec == NULL);
883 class_decref(import->imp_obd, "import", import);
884 OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
888 static void init_imp_at(struct imp_at *at) {
890 at_init(&at->iat_net_latency, 0, 0);
891 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
892 /* max service estimates are tracked on the server side, so
893 don't use the AT history here, just use the last reported
894 val. (But keep hist for proc histogram, worst_ever) */
895 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
900 struct obd_import *class_new_import(struct obd_device *obd)
902 struct obd_import *imp;
904 OBD_ALLOC(imp, sizeof(*imp));
908 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
909 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
910 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
911 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
912 spin_lock_init(&imp->imp_lock);
913 imp->imp_last_success_conn = 0;
914 imp->imp_state = LUSTRE_IMP_NEW;
915 imp->imp_obd = class_incref(obd, "import", imp);
916 sema_init(&imp->imp_sec_mutex, 1);
917 cfs_waitq_init(&imp->imp_recovery_waitq);
919 atomic_set(&imp->imp_refcount, 2);
920 atomic_set(&imp->imp_unregistering, 0);
921 atomic_set(&imp->imp_inflight, 0);
922 atomic_set(&imp->imp_replay_inflight, 0);
923 atomic_set(&imp->imp_inval_count, 0);
924 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
925 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
926 class_handle_hash(&imp->imp_handle, import_handle_addref);
927 init_imp_at(&imp->imp_at);
929 /* the default magic is V2, will be used in connect RPC, and
930 * then adjusted according to the flags in request/reply. */
931 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
935 EXPORT_SYMBOL(class_new_import);
937 void class_destroy_import(struct obd_import *import)
939 LASSERT(import != NULL);
940 LASSERT(import != LP_POISON);
942 class_handle_unhash(&import->imp_handle);
944 spin_lock(&import->imp_lock);
945 import->imp_generation++;
946 spin_unlock(&import->imp_lock);
947 class_import_put(import);
949 EXPORT_SYMBOL(class_destroy_import);
951 /* A connection defines an export context in which preallocation can
952 be managed. This releases the export pointer reference, and returns
953 the export handle, so the export refcount is 1 when this function
955 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
956 struct obd_uuid *cluuid)
958 struct obd_export *export;
959 LASSERT(conn != NULL);
960 LASSERT(obd != NULL);
961 LASSERT(cluuid != NULL);
964 export = class_new_export(obd, cluuid);
966 RETURN(PTR_ERR(export));
968 conn->cookie = export->exp_handle.h_cookie;
969 class_export_put(export);
971 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
972 cluuid->uuid, conn->cookie);
975 EXPORT_SYMBOL(class_connect);
977 /* if export is involved in recovery then clean up related things */
978 void class_export_recovery_cleanup(struct obd_export *exp)
980 struct obd_device *obd = exp->exp_obd;
982 spin_lock_bh(&obd->obd_processing_task_lock);
983 if (obd->obd_recovering && exp->exp_in_recovery) {
984 spin_lock(&exp->exp_lock);
985 exp->exp_in_recovery = 0;
986 spin_unlock(&exp->exp_lock);
987 obd->obd_connected_clients--;
988 /* each connected client is counted as recoverable */
989 obd->obd_recoverable_clients--;
990 if (exp->exp_req_replay_needed) {
991 spin_lock(&exp->exp_lock);
992 exp->exp_req_replay_needed = 0;
993 spin_unlock(&exp->exp_lock);
994 LASSERT(atomic_read(&obd->obd_req_replay_clients));
995 atomic_dec(&obd->obd_req_replay_clients);
997 if (exp->exp_lock_replay_needed) {
998 spin_lock(&exp->exp_lock);
999 exp->exp_lock_replay_needed = 0;
1000 spin_unlock(&exp->exp_lock);
1001 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1002 atomic_dec(&obd->obd_lock_replay_clients);
1005 spin_unlock_bh(&obd->obd_processing_task_lock);
1008 /* This function removes two references from the export: one for the
1009 * hash entry and one for the export pointer passed in. The export
1010 * pointer passed to this function is destroyed should not be used
1012 int class_disconnect(struct obd_export *export)
1014 int already_disconnected;
1017 if (export == NULL) {
1019 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1023 spin_lock(&export->exp_lock);
1024 already_disconnected = export->exp_disconnected;
1025 export->exp_disconnected = 1;
1027 if (!hlist_unhashed(&export->exp_nid_hash))
1028 lustre_hash_del(export->exp_obd->obd_nid_hash,
1029 &export->exp_connection->c_peer.nid,
1030 &export->exp_nid_hash);
1032 spin_unlock(&export->exp_lock);
1034 /* class_cleanup(), abort_recovery(), and class_fail_export()
1035 * all end up in here, and if any of them race we shouldn't
1036 * call extra class_export_puts(). */
1037 if (already_disconnected)
1040 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1041 export->exp_handle.h_cookie);
1043 class_export_recovery_cleanup(export);
1044 class_unlink_export(export);
1045 class_export_put(export);
1049 static void class_disconnect_export_list(struct list_head *list, int flags)
1052 struct lustre_handle fake_conn;
1053 struct obd_export *fake_exp, *exp;
1056 /* It's possible that an export may disconnect itself, but
1057 * nothing else will be added to this list. */
1058 while (!list_empty(list)) {
1059 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1060 class_export_get(exp);
1062 spin_lock(&exp->exp_lock);
1063 exp->exp_flags = flags;
1064 spin_unlock(&exp->exp_lock);
1066 if (obd_uuid_equals(&exp->exp_client_uuid,
1067 &exp->exp_obd->obd_uuid)) {
1069 "exp %p export uuid == obd uuid, don't discon\n",
1071 /* Need to delete this now so we don't end up pointing
1072 * to work_list later when this export is cleaned up. */
1073 list_del_init(&exp->exp_obd_chain);
1074 class_export_put(exp);
1078 fake_conn.cookie = exp->exp_handle.h_cookie;
1079 fake_exp = class_conn2export(&fake_conn);
1081 class_export_put(exp);
1085 spin_lock(&fake_exp->exp_lock);
1086 fake_exp->exp_flags = flags;
1087 spin_unlock(&fake_exp->exp_lock);
1089 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1090 "last request at "CFS_TIME_T"\n",
1091 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1092 exp, exp->exp_last_request_time);
1093 rc = obd_disconnect(fake_exp);
1094 class_export_put(exp);
1099 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1101 return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1102 (obd->obd_force ? OBD_OPT_FORCE : 0));
1105 void class_disconnect_exports(struct obd_device *obd)
1107 struct list_head work_list;
1110 /* Move all of the exports from obd_exports to a work list, en masse. */
1111 spin_lock(&obd->obd_dev_lock);
1112 list_add(&work_list, &obd->obd_exports);
1113 list_del_init(&obd->obd_exports);
1114 spin_unlock(&obd->obd_dev_lock);
1116 if (!list_empty(&work_list)) {
1117 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1118 "disconnecting them\n", obd->obd_minor, obd);
1119 class_disconnect_export_list(&work_list,
1120 get_exp_flags_from_obd(obd));
1122 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1123 obd->obd_minor, obd);
1126 EXPORT_SYMBOL(class_disconnect_exports);
1128 /* Remove exports that have not completed recovery.
1130 int class_disconnect_stale_exports(struct obd_device *obd,
1131 int (*test_export)(struct obd_export *))
1133 struct list_head work_list;
1134 struct list_head *pos, *n;
1135 struct obd_export *exp;
1139 CFS_INIT_LIST_HEAD(&work_list);
1140 spin_lock(&obd->obd_dev_lock);
1141 list_for_each_safe(pos, n, &obd->obd_exports) {
1142 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1143 if (test_export(exp))
1146 list_del(&exp->exp_obd_chain);
1147 list_add(&exp->exp_obd_chain, &work_list);
1148 /* don't count self-export as client */
1149 if (obd_uuid_equals(&exp->exp_client_uuid,
1150 &exp->exp_obd->obd_uuid))
1154 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1155 obd->obd_name, exp->exp_client_uuid.uuid,
1156 exp->exp_connection == NULL ? "<unknown>" :
1157 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1159 spin_unlock(&obd->obd_dev_lock);
1161 CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1162 obd->obd_name, cnt);
1163 class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1166 EXPORT_SYMBOL(class_disconnect_stale_exports);
1168 void class_fail_export(struct obd_export *exp)
1170 int rc, already_failed;
1172 spin_lock(&exp->exp_lock);
1173 already_failed = exp->exp_failed;
1174 exp->exp_failed = 1;
1175 spin_unlock(&exp->exp_lock);
1177 if (already_failed) {
1178 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1179 exp, exp->exp_client_uuid.uuid);
1183 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1184 exp, exp->exp_client_uuid.uuid);
1186 if (obd_dump_on_timeout)
1187 libcfs_debug_dumplog();
1189 /* Most callers into obd_disconnect are removing their own reference
1190 * (request, for example) in addition to the one from the hash table.
1191 * We don't have such a reference here, so make one. */
1192 class_export_get(exp);
1193 rc = obd_disconnect(exp);
1195 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1197 CDEBUG(D_HA, "disconnected export %p/%s\n",
1198 exp, exp->exp_client_uuid.uuid);
1200 EXPORT_SYMBOL(class_fail_export);
1202 char *obd_export_nid2str(struct obd_export *exp)
1204 if (exp->exp_connection != NULL)
1205 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1209 EXPORT_SYMBOL(obd_export_nid2str);
1211 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1213 struct obd_export *doomed_exp = NULL;
1214 int exports_evicted = 0;
1216 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1219 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1220 if (doomed_exp == NULL)
1223 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1224 "nid %s found, wanted nid %s, requested nid %s\n",
1225 obd_export_nid2str(doomed_exp),
1226 libcfs_nid2str(nid_key), nid);
1227 LASSERTF(doomed_exp != obd->obd_self_export,
1228 "self-export is hashed by NID?\n");
1230 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1231 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1233 class_fail_export(doomed_exp);
1234 class_export_put(doomed_exp);
1237 if (!exports_evicted)
1238 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1239 obd->obd_name, nid);
1240 return exports_evicted;
1242 EXPORT_SYMBOL(obd_export_evict_by_nid);
1244 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1246 struct obd_export *doomed_exp = NULL;
1247 struct obd_uuid doomed_uuid;
1248 int exports_evicted = 0;
1250 obd_str2uuid(&doomed_uuid, uuid);
1251 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1252 CERROR("%s: can't evict myself\n", obd->obd_name);
1253 return exports_evicted;
1256 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1258 if (doomed_exp == NULL) {
1259 CERROR("%s: can't disconnect %s: no exports found\n",
1260 obd->obd_name, uuid);
1262 CWARN("%s: evicting %s at adminstrative request\n",
1263 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1264 class_fail_export(doomed_exp);
1265 class_export_put(doomed_exp);
1269 return exports_evicted;
1271 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1274 * kill zombie imports and exports
1276 void obd_zombie_impexp_cull(void)
1278 struct obd_import *import;
1279 struct obd_export *export;
1283 spin_lock (&obd_zombie_impexp_lock);
1286 if (!list_empty(&obd_zombie_imports)) {
1287 import = list_entry(obd_zombie_imports.next,
1290 list_del(&import->imp_zombie_chain);
1294 if (!list_empty(&obd_zombie_exports)) {
1295 export = list_entry(obd_zombie_exports.next,
1298 list_del_init(&export->exp_obd_chain);
1301 spin_unlock(&obd_zombie_impexp_lock);
1304 class_import_destroy(import);
1307 class_export_destroy(export);
1309 } while (import != NULL || export != NULL);
1313 static struct completion obd_zombie_start;
1314 static struct completion obd_zombie_stop;
1315 static unsigned long obd_zombie_flags;
1316 static cfs_waitq_t obd_zombie_waitq;
1323 * check for work for kill zombie import/export thread.
1325 static int obd_zombie_impexp_check(void *arg)
1329 spin_lock(&obd_zombie_impexp_lock);
1330 rc = list_empty(&obd_zombie_imports) &&
1331 list_empty(&obd_zombie_exports) &&
1332 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1334 spin_unlock(&obd_zombie_impexp_lock);
1340 * notify import/export destroy thread about new zombie.
1342 static void obd_zombie_impexp_notify(void)
1344 cfs_waitq_signal(&obd_zombie_waitq);
1348 * check whether obd_zombie is idle
1350 static int obd_zombie_is_idle(void)
1354 LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1355 spin_lock(&obd_zombie_impexp_lock);
1356 rc = list_empty(&obd_zombie_imports) &&
1357 list_empty(&obd_zombie_exports);
1358 spin_unlock(&obd_zombie_impexp_lock);
1363 * wait when obd_zombie import/export queues become empty
1365 void obd_zombie_barrier(void)
1367 struct l_wait_info lwi = { 0 };
1369 l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1371 EXPORT_SYMBOL(obd_zombie_barrier);
1376 * destroy zombie export/import thread.
1378 static int obd_zombie_impexp_thread(void *unused)
1382 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1383 complete(&obd_zombie_start);
1387 complete(&obd_zombie_start);
1389 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1390 struct l_wait_info lwi = { 0 };
1392 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1394 obd_zombie_impexp_cull();
1395 /* Notify obd_zombie_barrier callers that queues may be empty */
1396 cfs_waitq_signal(&obd_zombie_waitq);
1399 complete(&obd_zombie_stop);
1404 #else /* ! KERNEL */
1406 static atomic_t zombie_recur = ATOMIC_INIT(0);
1407 static void *obd_zombie_impexp_work_cb;
1408 static void *obd_zombie_impexp_idle_cb;
1410 int obd_zombie_impexp_kill(void *arg)
1414 if (atomic_inc_return(&zombie_recur) == 1) {
1415 obd_zombie_impexp_cull();
1418 atomic_dec(&zombie_recur);
1425 * start destroy zombie import/export thread
1427 int obd_zombie_impexp_init(void)
1431 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1432 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1433 spin_lock_init(&obd_zombie_impexp_lock);
1434 init_completion(&obd_zombie_start);
1435 init_completion(&obd_zombie_stop);
1436 cfs_waitq_init(&obd_zombie_waitq);
1439 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1443 wait_for_completion(&obd_zombie_start);
1446 obd_zombie_impexp_work_cb =
1447 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1448 &obd_zombie_impexp_kill, NULL);
1450 obd_zombie_impexp_idle_cb =
1451 liblustre_register_idle_callback("obd_zombi_impexp_check",
1452 &obd_zombie_impexp_check, NULL);
1459 * stop destroy zombie import/export thread
1461 void obd_zombie_impexp_stop(void)
1463 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1464 obd_zombie_impexp_notify();
1466 wait_for_completion(&obd_zombie_stop);
1468 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1469 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);