1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
70 static struct obd_device *obd_device_alloc(void)
72 struct obd_device *obd;
74 OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
76 obd->obd_magic = OBD_DEVICE_MAGIC;
80 EXPORT_SYMBOL(obd_device_alloc);
82 static void obd_device_free(struct obd_device *obd)
85 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87 if (obd->obd_namespace != NULL) {
88 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89 obd, obd->obd_namespace, obd->obd_force);
92 lu_ref_fini(&obd->obd_reference);
93 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 struct obd_type *class_search_type(const char *name)
98 struct list_head *tmp;
99 struct obd_type *type;
101 spin_lock(&obd_types_lock);
102 list_for_each(tmp, &obd_types) {
103 type = list_entry(tmp, struct obd_type, typ_chain);
104 if (strcmp(type->typ_name, name) == 0) {
105 spin_unlock(&obd_types_lock);
109 spin_unlock(&obd_types_lock);
113 struct obd_type *class_get_type(const char *name)
115 struct obd_type *type = class_search_type(name);
119 const char *modname = name;
120 if (!request_module(modname)) {
121 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122 type = class_search_type(name);
124 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130 spin_lock(&type->obd_type_lock);
132 try_module_get(type->typ_dt_ops->o_owner);
133 spin_unlock(&type->obd_type_lock);
138 void class_put_type(struct obd_type *type)
141 spin_lock(&type->obd_type_lock);
143 module_put(type->typ_dt_ops->o_owner);
144 spin_unlock(&type->obd_type_lock);
147 #define CLASS_MAX_NAME 1024
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150 struct lprocfs_vars *vars, const char *name,
151 struct lu_device_type *ldt)
153 struct obd_type *type;
158 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
160 if (class_search_type(name)) {
161 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166 OBD_ALLOC(type, sizeof(*type));
170 OBD_ALLOC_PTR(type->typ_dt_ops);
171 OBD_ALLOC_PTR(type->typ_md_ops);
172 OBD_ALLOC(type->typ_name, strlen(name) + 1);
174 if (type->typ_dt_ops == NULL ||
175 type->typ_md_ops == NULL ||
176 type->typ_name == NULL)
179 *(type->typ_dt_ops) = *dt_ops;
180 /* md_ops is optional */
182 *(type->typ_md_ops) = *md_ops;
183 strcpy(type->typ_name, name);
184 spin_lock_init(&type->obd_type_lock);
187 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
189 if (IS_ERR(type->typ_procroot)) {
190 rc = PTR_ERR(type->typ_procroot);
191 type->typ_procroot = NULL;
197 rc = lu_device_type_init(ldt);
202 spin_lock(&obd_types_lock);
203 list_add(&type->typ_chain, &obd_types);
204 spin_unlock(&obd_types_lock);
209 if (type->typ_name != NULL)
210 OBD_FREE(type->typ_name, strlen(name) + 1);
211 if (type->typ_md_ops != NULL)
212 OBD_FREE_PTR(type->typ_md_ops);
213 if (type->typ_dt_ops != NULL)
214 OBD_FREE_PTR(type->typ_dt_ops);
215 OBD_FREE(type, sizeof(*type));
219 int class_unregister_type(const char *name)
221 struct obd_type *type = class_search_type(name);
225 CERROR("unknown obd type\n");
229 if (type->typ_refcnt) {
230 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231 /* This is a bad situation, let's make the best of it */
232 /* Remove ops, but leave the name for debugging */
233 OBD_FREE_PTR(type->typ_dt_ops);
234 OBD_FREE_PTR(type->typ_md_ops);
238 if (type->typ_procroot) {
239 lprocfs_remove(&type->typ_procroot);
243 lu_device_type_fini(type->typ_lu);
245 spin_lock(&obd_types_lock);
246 list_del(&type->typ_chain);
247 spin_unlock(&obd_types_lock);
248 OBD_FREE(type->typ_name, strlen(name) + 1);
249 if (type->typ_dt_ops != NULL)
250 OBD_FREE_PTR(type->typ_dt_ops);
251 if (type->typ_md_ops != NULL)
252 OBD_FREE_PTR(type->typ_md_ops);
253 OBD_FREE(type, sizeof(*type));
255 } /* class_unregister_type */
258 * Create a new obd device.
260 * Find an empty slot in ::obd_devs[], create a new obd device in it.
262 * \param typename [in] obd device type string.
263 * \param name [in] obd device name.
265 * \retval NULL if create fails, otherwise return the obd device
268 struct obd_device *class_newdev(const char *type_name, const char *name)
270 struct obd_device *result = NULL;
271 struct obd_device *newdev;
272 struct obd_type *type = NULL;
274 int new_obd_minor = 0;
276 if (strlen(name) >= MAX_OBD_NAME) {
277 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278 RETURN(ERR_PTR(-EINVAL));
281 type = class_get_type(type_name);
283 CERROR("OBD: unknown type: %s\n", type_name);
284 RETURN(ERR_PTR(-ENODEV));
287 newdev = obd_device_alloc();
288 if (newdev == NULL) {
289 class_put_type(type);
290 RETURN(ERR_PTR(-ENOMEM));
292 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
294 spin_lock(&obd_dev_lock);
295 for (i = 0; i < class_devno_max(); i++) {
296 struct obd_device *obd = class_num2obd(i);
297 if (obd && obd->obd_name &&
298 (strcmp(name, obd->obd_name) == 0)) {
299 CERROR("Device %s already exists, won't add\n", name);
301 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302 "%p obd_magic %08x != %08x\n", result,
303 result->obd_magic, OBD_DEVICE_MAGIC);
304 LASSERTF(result->obd_minor == new_obd_minor,
305 "%p obd_minor %d != %d\n", result,
306 result->obd_minor, new_obd_minor);
308 obd_devs[result->obd_minor] = NULL;
309 result->obd_name[0]='\0';
311 result = ERR_PTR(-EEXIST);
314 if (!result && !obd) {
316 result->obd_minor = i;
318 result->obd_type = type;
319 strncpy(result->obd_name, name,
320 sizeof(result->obd_name) - 1);
321 obd_devs[i] = result;
324 spin_unlock(&obd_dev_lock);
326 if (result == NULL && i >= class_devno_max()) {
327 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
329 result = ERR_PTR(-EOVERFLOW);
332 if (IS_ERR(result)) {
333 obd_device_free(newdev);
334 class_put_type(type);
336 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337 result->obd_name, result);
342 void class_release_dev(struct obd_device *obd)
344 struct obd_type *obd_type = obd->obd_type;
346 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350 LASSERT(obd_type != NULL);
352 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353 obd->obd_name,obd->obd_type->typ_name);
355 spin_lock(&obd_dev_lock);
356 obd_devs[obd->obd_minor] = NULL;
357 spin_unlock(&obd_dev_lock);
358 obd_device_free(obd);
360 class_put_type(obd_type);
363 int class_name2dev(const char *name)
370 spin_lock(&obd_dev_lock);
371 for (i = 0; i < class_devno_max(); i++) {
372 struct obd_device *obd = class_num2obd(i);
373 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374 /* Make sure we finished attaching before we give
375 out any references */
376 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377 if (obd->obd_attached) {
378 spin_unlock(&obd_dev_lock);
384 spin_unlock(&obd_dev_lock);
389 struct obd_device *class_name2obd(const char *name)
391 int dev = class_name2dev(name);
393 if (dev < 0 || dev > class_devno_max())
395 return class_num2obd(dev);
398 int class_uuid2dev(struct obd_uuid *uuid)
402 spin_lock(&obd_dev_lock);
403 for (i = 0; i < class_devno_max(); i++) {
404 struct obd_device *obd = class_num2obd(i);
405 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407 spin_unlock(&obd_dev_lock);
411 spin_unlock(&obd_dev_lock);
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
418 int dev = class_uuid2dev(uuid);
421 return class_num2obd(dev);
425 * Get obd device from ::obd_devs[]
427 * \param num [in] array index
429 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430 * otherwise return the obd device there.
432 struct obd_device *class_num2obd(int num)
434 struct obd_device *obd = NULL;
436 if (num < class_devno_max()) {
441 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442 "%p obd_magic %08x != %08x\n",
443 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444 LASSERTF(obd->obd_minor == num,
445 "%p obd_minor %0d != %0d\n",
446 obd, obd->obd_minor, num);
452 void class_obd_list(void)
457 spin_lock(&obd_dev_lock);
458 for (i = 0; i < class_devno_max(); i++) {
459 struct obd_device *obd = class_num2obd(i);
462 if (obd->obd_stopping)
464 else if (obd->obd_set_up)
466 else if (obd->obd_attached)
470 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471 i, status, obd->obd_type->typ_name,
472 obd->obd_name, obd->obd_uuid.uuid,
473 atomic_read(&obd->obd_refcount));
475 spin_unlock(&obd_dev_lock);
479 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
480 specified, then only the client with that uuid is returned,
481 otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483 const char * typ_name,
484 struct obd_uuid *grp_uuid)
488 spin_lock(&obd_dev_lock);
489 for (i = 0; i < class_devno_max(); i++) {
490 struct obd_device *obd = class_num2obd(i);
493 if ((strncmp(obd->obd_type->typ_name, typ_name,
494 strlen(typ_name)) == 0)) {
495 if (obd_uuid_equals(tgt_uuid,
496 &obd->u.cli.cl_target_uuid) &&
497 ((grp_uuid)? obd_uuid_equals(grp_uuid,
498 &obd->obd_uuid) : 1)) {
499 spin_unlock(&obd_dev_lock);
504 spin_unlock(&obd_dev_lock);
509 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
510 struct obd_uuid *grp_uuid)
512 struct obd_device *obd;
514 obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
516 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
521 /* Iterate the obd_device list looking devices have grp_uuid. Start
522 searching at *next, and if a device is found, the next index to look
523 at is saved in *next. If next is NULL, then the first matching device
524 will always be returned. */
525 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
531 else if (*next >= 0 && *next < class_devno_max())
536 spin_lock(&obd_dev_lock);
537 for (; i < class_devno_max(); i++) {
538 struct obd_device *obd = class_num2obd(i);
541 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
544 spin_unlock(&obd_dev_lock);
548 spin_unlock(&obd_dev_lock);
554 void obd_cleanup_caches(void)
559 if (obd_device_cachep) {
560 rc = cfs_mem_cache_destroy(obd_device_cachep);
561 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
562 obd_device_cachep = NULL;
565 rc = cfs_mem_cache_destroy(obdo_cachep);
566 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
570 rc = cfs_mem_cache_destroy(import_cachep);
571 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
572 import_cachep = NULL;
575 rc = cfs_mem_cache_destroy(capa_cachep);
576 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
582 int obd_init_caches(void)
586 LASSERT(obd_device_cachep == NULL);
587 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
588 sizeof(struct obd_device),
590 if (!obd_device_cachep)
593 LASSERT(obdo_cachep == NULL);
594 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
599 LASSERT(import_cachep == NULL);
600 import_cachep = cfs_mem_cache_create("ll_import_cache",
601 sizeof(struct obd_import),
606 LASSERT(capa_cachep == NULL);
607 capa_cachep = cfs_mem_cache_create("capa_cache",
608 sizeof(struct obd_capa), 0, 0);
614 obd_cleanup_caches();
619 /* map connection to client */
620 struct obd_export *class_conn2export(struct lustre_handle *conn)
622 struct obd_export *export;
626 CDEBUG(D_CACHE, "looking for null handle\n");
630 if (conn->cookie == -1) { /* this means assign a new connection */
631 CDEBUG(D_CACHE, "want a new connection\n");
635 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
636 export = class_handle2object(conn->cookie);
640 struct obd_device *class_exp2obd(struct obd_export *exp)
647 struct obd_device *class_conn2obd(struct lustre_handle *conn)
649 struct obd_export *export;
650 export = class_conn2export(conn);
652 struct obd_device *obd = export->exp_obd;
653 class_export_put(export);
659 struct obd_import *class_exp2cliimp(struct obd_export *exp)
661 struct obd_device *obd = exp->exp_obd;
664 return obd->u.cli.cl_import;
667 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
669 struct obd_device *obd = class_conn2obd(conn);
672 return obd->u.cli.cl_import;
675 /* Export management functions */
676 static void export_handle_addref(void *export)
678 class_export_get(export);
681 struct obd_export *class_export_get(struct obd_export *exp)
683 atomic_inc(&exp->exp_refcount);
684 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
685 atomic_read(&exp->exp_refcount));
688 EXPORT_SYMBOL(class_export_get);
690 void class_export_put(struct obd_export *exp)
692 LASSERT(exp != NULL);
693 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
694 atomic_read(&exp->exp_refcount) - 1);
695 LASSERT(atomic_read(&exp->exp_refcount) > 0);
696 LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
698 if (atomic_dec_and_test(&exp->exp_refcount)) {
699 LASSERT (list_empty(&exp->exp_obd_chain));
701 CDEBUG(D_IOCTL, "final put %p/%s\n",
702 exp, exp->exp_client_uuid.uuid);
704 spin_lock(&obd_zombie_impexp_lock);
705 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
706 spin_unlock(&obd_zombie_impexp_lock);
708 if (obd_zombie_impexp_notify != NULL)
709 obd_zombie_impexp_notify();
712 EXPORT_SYMBOL(class_export_put);
714 static void class_export_destroy(struct obd_export *exp)
716 struct obd_device *obd = exp->exp_obd;
719 LASSERT (atomic_read(&exp->exp_refcount) == 0);
721 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
722 exp->exp_client_uuid.uuid, obd->obd_name);
724 LASSERT(obd != NULL);
726 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
727 if (exp->exp_connection)
728 ptlrpc_put_connection_superhack(exp->exp_connection);
730 LASSERT(list_empty(&exp->exp_outstanding_replies));
731 LASSERT(list_empty(&exp->exp_req_replay_queue));
732 obd_destroy_export(exp);
733 class_decref(obd, "export", exp);
735 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
739 /* Creates a new export, adds it to the hash table, and returns a
740 * pointer to it. The refcount is 2: one for the hash reference, and
741 * one for the pointer returned by this function. */
742 struct obd_export *class_new_export(struct obd_device *obd,
743 struct obd_uuid *cluuid)
745 struct obd_export *export;
748 OBD_ALLOC_PTR(export);
750 return ERR_PTR(-ENOMEM);
752 export->exp_conn_cnt = 0;
753 export->exp_lock_hash = NULL;
754 atomic_set(&export->exp_refcount, 2);
755 atomic_set(&export->exp_rpc_count, 0);
756 export->exp_obd = obd;
757 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
758 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
759 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
760 class_handle_hash(&export->exp_handle, export_handle_addref);
761 export->exp_last_request_time = cfs_time_current_sec();
762 spin_lock_init(&export->exp_lock);
763 INIT_HLIST_NODE(&export->exp_uuid_hash);
764 INIT_HLIST_NODE(&export->exp_nid_hash);
766 export->exp_sp_peer = LUSTRE_SP_ANY;
767 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
768 export->exp_client_uuid = *cluuid;
769 obd_init_export(export);
771 spin_lock(&obd->obd_dev_lock);
772 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
773 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
774 &export->exp_uuid_hash);
776 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
777 obd->obd_name, cluuid->uuid, rc);
778 spin_unlock(&obd->obd_dev_lock);
779 class_handle_unhash(&export->exp_handle);
780 OBD_FREE_PTR(export);
781 return ERR_PTR(-EALREADY);
785 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
786 class_incref(obd, "export", export);
787 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
788 list_add_tail(&export->exp_obd_chain_timed,
789 &export->exp_obd->obd_exports_timed);
790 export->exp_obd->obd_num_exports++;
791 spin_unlock(&obd->obd_dev_lock);
795 EXPORT_SYMBOL(class_new_export);
797 void class_unlink_export(struct obd_export *exp)
799 class_handle_unhash(&exp->exp_handle);
801 spin_lock(&exp->exp_obd->obd_dev_lock);
802 /* delete an uuid-export hashitem from hashtables */
803 if (!hlist_unhashed(&exp->exp_uuid_hash))
804 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
805 &exp->exp_client_uuid,
806 &exp->exp_uuid_hash);
808 list_del_init(&exp->exp_obd_chain);
809 list_del_init(&exp->exp_obd_chain_timed);
810 exp->exp_obd->obd_num_exports--;
811 spin_unlock(&exp->exp_obd->obd_dev_lock);
813 class_export_put(exp);
815 EXPORT_SYMBOL(class_unlink_export);
817 /* Import management functions */
818 static void import_handle_addref(void *import)
820 class_import_get(import);
823 struct obd_import *class_import_get(struct obd_import *import)
825 LASSERT(atomic_read(&import->imp_refcount) >= 0);
826 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
827 atomic_inc(&import->imp_refcount);
828 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
829 atomic_read(&import->imp_refcount));
832 EXPORT_SYMBOL(class_import_get);
834 void class_import_put(struct obd_import *import)
838 LASSERT(atomic_read(&import->imp_refcount) > 0);
839 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
840 LASSERT(list_empty(&import->imp_zombie_chain));
842 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
843 atomic_read(&import->imp_refcount) - 1);
845 if (atomic_dec_and_test(&import->imp_refcount)) {
847 CDEBUG(D_INFO, "final put import %p\n", import);
849 spin_lock(&obd_zombie_impexp_lock);
850 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
851 spin_unlock(&obd_zombie_impexp_lock);
853 if (obd_zombie_impexp_notify != NULL)
854 obd_zombie_impexp_notify();
859 EXPORT_SYMBOL(class_import_put);
861 void class_import_destroy(struct obd_import *import)
865 CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
866 import->imp_obd->obd_name);
868 LASSERT(atomic_read(&import->imp_refcount) == 0);
870 ptlrpc_put_connection_superhack(import->imp_connection);
872 while (!list_empty(&import->imp_conn_list)) {
873 struct obd_import_conn *imp_conn;
875 imp_conn = list_entry(import->imp_conn_list.next,
876 struct obd_import_conn, oic_item);
877 list_del(&imp_conn->oic_item);
878 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
879 OBD_FREE(imp_conn, sizeof(*imp_conn));
882 LASSERT(import->imp_sec == NULL);
883 class_decref(import->imp_obd, "import", import);
884 OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
888 static void init_imp_at(struct imp_at *at) {
890 at_init(&at->iat_net_latency, 0, 0);
891 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
892 /* max service estimates are tracked on the server side, so
893 don't use the AT history here, just use the last reported
894 val. (But keep hist for proc histogram, worst_ever) */
895 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
900 struct obd_import *class_new_import(struct obd_device *obd)
902 struct obd_import *imp;
904 OBD_ALLOC(imp, sizeof(*imp));
908 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
909 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
910 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
911 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
912 spin_lock_init(&imp->imp_lock);
913 imp->imp_last_success_conn = 0;
914 imp->imp_state = LUSTRE_IMP_NEW;
915 imp->imp_obd = class_incref(obd, "import", imp);
916 sema_init(&imp->imp_sec_mutex, 1);
917 cfs_waitq_init(&imp->imp_recovery_waitq);
919 atomic_set(&imp->imp_refcount, 2);
920 atomic_set(&imp->imp_inflight, 0);
921 atomic_set(&imp->imp_replay_inflight, 0);
922 atomic_set(&imp->imp_inval_count, 0);
923 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
924 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
925 class_handle_hash(&imp->imp_handle, import_handle_addref);
926 init_imp_at(&imp->imp_at);
928 /* the default magic is V2, will be used in connect RPC, and
929 * then adjusted according to the flags in request/reply. */
930 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
934 EXPORT_SYMBOL(class_new_import);
936 void class_destroy_import(struct obd_import *import)
938 LASSERT(import != NULL);
939 LASSERT(import != LP_POISON);
941 class_handle_unhash(&import->imp_handle);
943 spin_lock(&import->imp_lock);
944 import->imp_generation++;
945 spin_unlock(&import->imp_lock);
946 class_import_put(import);
948 EXPORT_SYMBOL(class_destroy_import);
950 /* A connection defines an export context in which preallocation can
951 be managed. This releases the export pointer reference, and returns
952 the export handle, so the export refcount is 1 when this function
954 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
955 struct obd_uuid *cluuid)
957 struct obd_export *export;
958 LASSERT(conn != NULL);
959 LASSERT(obd != NULL);
960 LASSERT(cluuid != NULL);
963 export = class_new_export(obd, cluuid);
965 RETURN(PTR_ERR(export));
967 conn->cookie = export->exp_handle.h_cookie;
968 class_export_put(export);
970 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
971 cluuid->uuid, conn->cookie);
974 EXPORT_SYMBOL(class_connect);
976 /* if export is involved in recovery then clean up related things */
977 void class_export_recovery_cleanup(struct obd_export *exp)
979 struct obd_device *obd = exp->exp_obd;
981 spin_lock_bh(&obd->obd_processing_task_lock);
982 if (obd->obd_recovering && exp->exp_in_recovery) {
983 spin_lock(&exp->exp_lock);
984 exp->exp_in_recovery = 0;
985 spin_unlock(&exp->exp_lock);
986 obd->obd_connected_clients--;
987 /* each connected client is counted as recoverable */
988 obd->obd_recoverable_clients--;
989 if (exp->exp_req_replay_needed) {
990 spin_lock(&exp->exp_lock);
991 exp->exp_req_replay_needed = 0;
992 spin_unlock(&exp->exp_lock);
993 LASSERT(atomic_read(&obd->obd_req_replay_clients));
994 atomic_dec(&obd->obd_req_replay_clients);
996 if (exp->exp_lock_replay_needed) {
997 spin_lock(&exp->exp_lock);
998 exp->exp_lock_replay_needed = 0;
999 spin_unlock(&exp->exp_lock);
1000 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1001 atomic_dec(&obd->obd_lock_replay_clients);
1004 spin_unlock_bh(&obd->obd_processing_task_lock);
1007 /* This function removes two references from the export: one for the
1008 * hash entry and one for the export pointer passed in. The export
1009 * pointer passed to this function is destroyed should not be used
1011 int class_disconnect(struct obd_export *export)
1013 int already_disconnected;
1016 if (export == NULL) {
1018 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1022 spin_lock(&export->exp_lock);
1023 already_disconnected = export->exp_disconnected;
1024 export->exp_disconnected = 1;
1026 if (!hlist_unhashed(&export->exp_nid_hash))
1027 lustre_hash_del(export->exp_obd->obd_nid_hash,
1028 &export->exp_connection->c_peer.nid,
1029 &export->exp_nid_hash);
1031 spin_unlock(&export->exp_lock);
1033 /* class_cleanup(), abort_recovery(), and class_fail_export()
1034 * all end up in here, and if any of them race we shouldn't
1035 * call extra class_export_puts(). */
1036 if (already_disconnected)
1039 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1040 export->exp_handle.h_cookie);
1042 class_export_recovery_cleanup(export);
1043 class_unlink_export(export);
1044 class_export_put(export);
1048 static void class_disconnect_export_list(struct list_head *list, int flags)
1051 struct lustre_handle fake_conn;
1052 struct obd_export *fake_exp, *exp;
1055 /* It's possible that an export may disconnect itself, but
1056 * nothing else will be added to this list. */
1057 while (!list_empty(list)) {
1058 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1059 class_export_get(exp);
1061 spin_lock(&exp->exp_lock);
1062 exp->exp_flags = flags;
1063 spin_unlock(&exp->exp_lock);
1065 if (obd_uuid_equals(&exp->exp_client_uuid,
1066 &exp->exp_obd->obd_uuid)) {
1068 "exp %p export uuid == obd uuid, don't discon\n",
1070 /* Need to delete this now so we don't end up pointing
1071 * to work_list later when this export is cleaned up. */
1072 list_del_init(&exp->exp_obd_chain);
1073 class_export_put(exp);
1077 fake_conn.cookie = exp->exp_handle.h_cookie;
1078 fake_exp = class_conn2export(&fake_conn);
1080 class_export_put(exp);
1084 spin_lock(&fake_exp->exp_lock);
1085 fake_exp->exp_flags = flags;
1086 spin_unlock(&fake_exp->exp_lock);
1088 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1089 "last request at "CFS_TIME_T"\n",
1090 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1091 exp, exp->exp_last_request_time);
1092 rc = obd_disconnect(fake_exp);
1093 class_export_put(exp);
1098 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1100 return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1101 (obd->obd_force ? OBD_OPT_FORCE : 0));
1104 void class_disconnect_exports(struct obd_device *obd)
1106 struct list_head work_list;
1109 /* Move all of the exports from obd_exports to a work list, en masse. */
1110 spin_lock(&obd->obd_dev_lock);
1111 list_add(&work_list, &obd->obd_exports);
1112 list_del_init(&obd->obd_exports);
1113 spin_unlock(&obd->obd_dev_lock);
1115 if (!list_empty(&work_list)) {
1116 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1117 "disconnecting them\n", obd->obd_minor, obd);
1118 class_disconnect_export_list(&work_list,
1119 get_exp_flags_from_obd(obd));
1121 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1122 obd->obd_minor, obd);
1125 EXPORT_SYMBOL(class_disconnect_exports);
1127 /* Remove exports that have not completed recovery.
1129 int class_disconnect_stale_exports(struct obd_device *obd,
1130 int (*test_export)(struct obd_export *))
1132 struct list_head work_list;
1133 struct list_head *pos, *n;
1134 struct obd_export *exp;
1138 CFS_INIT_LIST_HEAD(&work_list);
1139 spin_lock(&obd->obd_dev_lock);
1140 list_for_each_safe(pos, n, &obd->obd_exports) {
1141 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1142 if (test_export(exp))
1145 list_del(&exp->exp_obd_chain);
1146 list_add(&exp->exp_obd_chain, &work_list);
1147 /* don't count self-export as client */
1148 if (obd_uuid_equals(&exp->exp_client_uuid,
1149 &exp->exp_obd->obd_uuid))
1153 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1154 obd->obd_name, exp->exp_client_uuid.uuid,
1155 exp->exp_connection == NULL ? "<unknown>" :
1156 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1158 spin_unlock(&obd->obd_dev_lock);
1160 CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1161 obd->obd_name, cnt);
1162 class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1165 EXPORT_SYMBOL(class_disconnect_stale_exports);
1167 int oig_init(struct obd_io_group **oig_out)
1169 struct obd_io_group *oig;
1172 OBD_ALLOC(oig, sizeof(*oig));
1176 spin_lock_init(&oig->oig_lock);
1178 oig->oig_pending = 0;
1179 atomic_set(&oig->oig_refcount, 1);
1180 cfs_waitq_init(&oig->oig_waitq);
1181 CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1186 EXPORT_SYMBOL(oig_init);
1188 static inline void oig_grab(struct obd_io_group *oig)
1190 atomic_inc(&oig->oig_refcount);
1193 void oig_release(struct obd_io_group *oig)
1195 if (atomic_dec_and_test(&oig->oig_refcount))
1196 OBD_FREE(oig, sizeof(*oig));
1198 EXPORT_SYMBOL(oig_release);
1200 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1203 CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1204 spin_lock(&oig->oig_lock);
1210 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1212 spin_unlock(&oig->oig_lock);
1217 EXPORT_SYMBOL(oig_add_one);
1219 void oig_complete_one(struct obd_io_group *oig,
1220 struct oig_callback_context *occ, int rc)
1222 cfs_waitq_t *wake = NULL;
1225 spin_lock(&oig->oig_lock);
1228 list_del_init(&occ->occ_oig_item);
1230 old_rc = oig->oig_rc;
1231 if (oig->oig_rc == 0 && rc != 0)
1234 if (--oig->oig_pending <= 0)
1235 wake = &oig->oig_waitq;
1237 spin_unlock(&oig->oig_lock);
1239 CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1240 "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1243 cfs_waitq_signal(wake);
1246 EXPORT_SYMBOL(oig_complete_one);
1248 static int oig_done(struct obd_io_group *oig)
1251 spin_lock(&oig->oig_lock);
1252 if (oig->oig_pending <= 0)
1254 spin_unlock(&oig->oig_lock);
1258 static void interrupted_oig(void *data)
1260 struct obd_io_group *oig = data;
1261 struct oig_callback_context *occ;
1263 spin_lock(&oig->oig_lock);
1264 /* We need to restart the processing each time we drop the lock, as
1265 * it is possible other threads called oig_complete_one() to remove
1266 * an entry elsewhere in the list while we dropped lock. We need to
1267 * drop the lock because osc_ap_completion() calls oig_complete_one()
1268 * which re-gets this lock ;-) as well as a lock ordering issue. */
1270 list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1271 if (occ->interrupted)
1273 occ->interrupted = 1;
1274 spin_unlock(&oig->oig_lock);
1275 occ->occ_interrupted(occ);
1276 spin_lock(&oig->oig_lock);
1279 spin_unlock(&oig->oig_lock);
1282 int oig_wait(struct obd_io_group *oig)
1284 struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1287 CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1290 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1291 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1292 /* we can't continue until the oig has emptied and stopped
1293 * referencing state that the caller will free upon return */
1295 lwi = (struct l_wait_info){ 0, };
1296 } while (rc == -EINTR);
1298 LASSERTF(oig->oig_pending == 0,
1299 "exiting oig_wait(oig = %p) with %d pending\n", oig,
1302 CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1305 EXPORT_SYMBOL(oig_wait);
1307 void class_fail_export(struct obd_export *exp)
1309 int rc, already_failed;
1311 spin_lock(&exp->exp_lock);
1312 already_failed = exp->exp_failed;
1313 exp->exp_failed = 1;
1314 spin_unlock(&exp->exp_lock);
1316 if (already_failed) {
1317 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1318 exp, exp->exp_client_uuid.uuid);
1322 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1323 exp, exp->exp_client_uuid.uuid);
1325 if (obd_dump_on_timeout)
1326 libcfs_debug_dumplog();
1328 /* Most callers into obd_disconnect are removing their own reference
1329 * (request, for example) in addition to the one from the hash table.
1330 * We don't have such a reference here, so make one. */
1331 class_export_get(exp);
1332 rc = obd_disconnect(exp);
1334 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1336 CDEBUG(D_HA, "disconnected export %p/%s\n",
1337 exp, exp->exp_client_uuid.uuid);
1339 EXPORT_SYMBOL(class_fail_export);
1341 char *obd_export_nid2str(struct obd_export *exp)
1343 if (exp->exp_connection != NULL)
1344 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1348 EXPORT_SYMBOL(obd_export_nid2str);
1350 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1352 struct obd_export *doomed_exp = NULL;
1353 int exports_evicted = 0;
1355 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1358 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1359 if (doomed_exp == NULL)
1362 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1363 "nid %s found, wanted nid %s, requested nid %s\n",
1364 obd_export_nid2str(doomed_exp),
1365 libcfs_nid2str(nid_key), nid);
1366 LASSERTF(doomed_exp != obd->obd_self_export,
1367 "self-export is hashed by NID?\n");
1369 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1370 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1372 class_fail_export(doomed_exp);
1373 class_export_put(doomed_exp);
1376 if (!exports_evicted)
1377 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1378 obd->obd_name, nid);
1379 return exports_evicted;
1381 EXPORT_SYMBOL(obd_export_evict_by_nid);
1383 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1385 struct obd_export *doomed_exp = NULL;
1386 struct obd_uuid doomed_uuid;
1387 int exports_evicted = 0;
1389 obd_str2uuid(&doomed_uuid, uuid);
1390 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1391 CERROR("%s: can't evict myself\n", obd->obd_name);
1392 return exports_evicted;
1395 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1397 if (doomed_exp == NULL) {
1398 CERROR("%s: can't disconnect %s: no exports found\n",
1399 obd->obd_name, uuid);
1401 CWARN("%s: evicting %s at adminstrative request\n",
1402 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1403 class_fail_export(doomed_exp);
1404 class_export_put(doomed_exp);
1408 return exports_evicted;
1410 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1413 * kill zombie imports and exports
1415 void obd_zombie_impexp_cull(void)
1417 struct obd_import *import;
1418 struct obd_export *export;
1422 spin_lock (&obd_zombie_impexp_lock);
1425 if (!list_empty(&obd_zombie_imports)) {
1426 import = list_entry(obd_zombie_imports.next,
1429 list_del(&import->imp_zombie_chain);
1433 if (!list_empty(&obd_zombie_exports)) {
1434 export = list_entry(obd_zombie_exports.next,
1437 list_del_init(&export->exp_obd_chain);
1440 spin_unlock(&obd_zombie_impexp_lock);
1443 class_import_destroy(import);
1446 class_export_destroy(export);
1448 } while (import != NULL || export != NULL);
1452 static struct completion obd_zombie_start;
1453 static struct completion obd_zombie_stop;
1454 static unsigned long obd_zombie_flags;
1455 static cfs_waitq_t obd_zombie_waitq;
1462 * check for work for kill zombie import/export thread.
1464 int obd_zombie_impexp_check(void *arg)
1468 spin_lock(&obd_zombie_impexp_lock);
1469 rc = list_empty(&obd_zombie_imports) &&
1470 list_empty(&obd_zombie_exports) &&
1471 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1473 spin_unlock(&obd_zombie_impexp_lock);
1479 * notify import/export destroy thread about new zombie.
1481 static void obd_zombie_impexp_notify(void)
1483 cfs_waitq_signal(&obd_zombie_waitq);
1489 * destroy zombie export/import thread.
1491 static int obd_zombie_impexp_thread(void *unused)
1495 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1496 complete(&obd_zombie_start);
1500 complete(&obd_zombie_start);
1502 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1503 struct l_wait_info lwi = { 0 };
1505 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1507 obd_zombie_impexp_cull();
1510 complete(&obd_zombie_stop);
1515 #else /* ! KERNEL */
1517 static atomic_t zombie_recur = ATOMIC_INIT(0);
1518 static void *obd_zombie_impexp_work_cb;
1519 static void *obd_zombie_impexp_idle_cb;
1521 int obd_zombie_impexp_kill(void *arg)
1525 if (atomic_inc_return(&zombie_recur) == 1) {
1526 obd_zombie_impexp_cull();
1529 atomic_dec(&zombie_recur);
1536 * start destroy zombie import/export thread
1538 int obd_zombie_impexp_init(void)
1542 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1543 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1544 spin_lock_init(&obd_zombie_impexp_lock);
1545 init_completion(&obd_zombie_start);
1546 init_completion(&obd_zombie_stop);
1547 cfs_waitq_init(&obd_zombie_waitq);
1550 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1554 wait_for_completion(&obd_zombie_start);
1557 obd_zombie_impexp_work_cb =
1558 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1559 &obd_zombie_impexp_kill, NULL);
1561 obd_zombie_impexp_idle_cb =
1562 liblustre_register_idle_callback("obd_zombi_impexp_check",
1563 &obd_zombie_impexp_check, NULL);
1570 * stop destroy zombie import/export thread
1572 void obd_zombie_impexp_stop(void)
1574 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1575 obd_zombie_impexp_notify();
1577 wait_for_completion(&obd_zombie_stop);
1579 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1580 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);