1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see [sun.com URL with a
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
70 static struct obd_device *obd_device_alloc(void)
72 struct obd_device *obd;
74 OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
76 obd->obd_magic = OBD_DEVICE_MAGIC;
80 EXPORT_SYMBOL(obd_device_alloc);
82 static void obd_device_free(struct obd_device *obd)
85 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87 if (obd->obd_namespace != NULL) {
88 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89 obd, obd->obd_namespace, obd->obd_force);
92 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 EXPORT_SYMBOL(obd_device_free);
96 struct obd_type *class_search_type(const char *name)
98 struct list_head *tmp;
99 struct obd_type *type;
101 spin_lock(&obd_types_lock);
102 list_for_each(tmp, &obd_types) {
103 type = list_entry(tmp, struct obd_type, typ_chain);
104 if (strcmp(type->typ_name, name) == 0) {
105 spin_unlock(&obd_types_lock);
109 spin_unlock(&obd_types_lock);
113 struct obd_type *class_get_type(const char *name)
115 struct obd_type *type = class_search_type(name);
119 const char *modname = name;
120 if (!request_module(modname)) {
121 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122 type = class_search_type(name);
124 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130 spin_lock(&type->obd_type_lock);
132 try_module_get(type->typ_dt_ops->o_owner);
133 spin_unlock(&type->obd_type_lock);
138 void class_put_type(struct obd_type *type)
141 spin_lock(&type->obd_type_lock);
143 module_put(type->typ_dt_ops->o_owner);
144 spin_unlock(&type->obd_type_lock);
147 #define CLASS_MAX_NAME 1024
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150 struct lprocfs_vars *vars, const char *name,
151 struct lu_device_type *ldt)
153 struct obd_type *type;
158 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
160 if (class_search_type(name)) {
161 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166 OBD_ALLOC(type, sizeof(*type));
170 OBD_ALLOC_PTR(type->typ_dt_ops);
171 OBD_ALLOC_PTR(type->typ_md_ops);
172 OBD_ALLOC(type->typ_name, strlen(name) + 1);
174 if (type->typ_dt_ops == NULL ||
175 type->typ_md_ops == NULL ||
176 type->typ_name == NULL)
179 *(type->typ_dt_ops) = *dt_ops;
180 /* md_ops is optional */
182 *(type->typ_md_ops) = *md_ops;
183 strcpy(type->typ_name, name);
184 spin_lock_init(&type->obd_type_lock);
187 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
189 if (IS_ERR(type->typ_procroot)) {
190 rc = PTR_ERR(type->typ_procroot);
191 type->typ_procroot = NULL;
197 rc = ldt->ldt_ops->ldto_init(ldt);
202 spin_lock(&obd_types_lock);
203 list_add(&type->typ_chain, &obd_types);
204 spin_unlock(&obd_types_lock);
209 if (type->typ_name != NULL)
210 OBD_FREE(type->typ_name, strlen(name) + 1);
211 if (type->typ_md_ops != NULL)
212 OBD_FREE_PTR(type->typ_md_ops);
213 if (type->typ_dt_ops != NULL)
214 OBD_FREE_PTR(type->typ_dt_ops);
215 OBD_FREE(type, sizeof(*type));
219 int class_unregister_type(const char *name)
221 struct obd_type *type = class_search_type(name);
225 CERROR("unknown obd type\n");
229 if (type->typ_refcnt) {
230 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231 /* This is a bad situation, let's make the best of it */
232 /* Remove ops, but leave the name for debugging */
233 OBD_FREE_PTR(type->typ_dt_ops);
234 OBD_FREE_PTR(type->typ_md_ops);
238 if (type->typ_procroot) {
239 lprocfs_remove(&type->typ_procroot);
243 type->typ_lu->ldt_ops->ldto_fini(type->typ_lu);
245 spin_lock(&obd_types_lock);
246 list_del(&type->typ_chain);
247 spin_unlock(&obd_types_lock);
248 OBD_FREE(type->typ_name, strlen(name) + 1);
249 if (type->typ_dt_ops != NULL)
250 OBD_FREE_PTR(type->typ_dt_ops);
251 if (type->typ_md_ops != NULL)
252 OBD_FREE_PTR(type->typ_md_ops);
253 OBD_FREE(type, sizeof(*type));
255 } /* class_unregister_type */
258 * Create a new obd device.
260 * Find an empty slot in ::obd_devs[], create a new obd device in it.
262 * \param typename [in] obd device type string.
263 * \param name [in] obd device name.
265 * \retval NULL if create fails, otherwise return the obd device
268 struct obd_device *class_newdev(const char *type_name, const char *name)
270 struct obd_device *result = NULL;
271 struct obd_device *newdev;
272 struct obd_type *type = NULL;
274 int new_obd_minor = 0;
276 if (strlen(name) >= MAX_OBD_NAME) {
277 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278 RETURN(ERR_PTR(-EINVAL));
281 type = class_get_type(type_name);
283 CERROR("OBD: unknown type: %s\n", type_name);
284 RETURN(ERR_PTR(-ENODEV));
287 newdev = obd_device_alloc();
288 if (newdev == NULL) {
289 class_put_type(type);
290 RETURN(ERR_PTR(-ENOMEM));
292 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
294 spin_lock(&obd_dev_lock);
295 for (i = 0; i < class_devno_max(); i++) {
296 struct obd_device *obd = class_num2obd(i);
297 if (obd && obd->obd_name &&
298 (strcmp(name, obd->obd_name) == 0)) {
299 CERROR("Device %s already exists, won't add\n", name);
301 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302 "%p obd_magic %08x != %08x\n", result,
303 result->obd_magic, OBD_DEVICE_MAGIC);
304 LASSERTF(result->obd_minor == new_obd_minor,
305 "%p obd_minor %d != %d\n", result,
306 result->obd_minor, new_obd_minor);
308 obd_devs[result->obd_minor] = NULL;
309 result->obd_name[0]='\0';
311 result = ERR_PTR(-EEXIST);
314 if (!result && !obd) {
316 result->obd_minor = i;
318 result->obd_type = type;
319 strncpy(result->obd_name, name,
320 sizeof(result->obd_name) - 1);
321 obd_devs[i] = result;
324 spin_unlock(&obd_dev_lock);
326 if (result == NULL && i >= class_devno_max()) {
327 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
329 result = ERR_PTR(-EOVERFLOW);
332 if (IS_ERR(result)) {
333 obd_device_free(newdev);
334 class_put_type(type);
336 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337 result->obd_name, result);
342 void class_release_dev(struct obd_device *obd)
344 struct obd_type *obd_type = obd->obd_type;
346 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350 LASSERT(obd_type != NULL);
352 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353 obd->obd_name,obd->obd_type->typ_name);
355 spin_lock(&obd_dev_lock);
356 obd_devs[obd->obd_minor] = NULL;
357 spin_unlock(&obd_dev_lock);
358 obd_device_free(obd);
360 class_put_type(obd_type);
363 int class_name2dev(const char *name)
370 spin_lock(&obd_dev_lock);
371 for (i = 0; i < class_devno_max(); i++) {
372 struct obd_device *obd = class_num2obd(i);
373 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374 /* Make sure we finished attaching before we give
375 out any references */
376 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377 if (obd->obd_attached) {
378 spin_unlock(&obd_dev_lock);
384 spin_unlock(&obd_dev_lock);
389 struct obd_device *class_name2obd(const char *name)
391 int dev = class_name2dev(name);
393 if (dev < 0 || dev > class_devno_max())
395 return class_num2obd(dev);
398 int class_uuid2dev(struct obd_uuid *uuid)
402 spin_lock(&obd_dev_lock);
403 for (i = 0; i < class_devno_max(); i++) {
404 struct obd_device *obd = class_num2obd(i);
405 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407 spin_unlock(&obd_dev_lock);
411 spin_unlock(&obd_dev_lock);
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
418 int dev = class_uuid2dev(uuid);
421 return class_num2obd(dev);
425 * Get obd device from ::obd_devs[]
427 * \param num [in] array index
429 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430 * otherwise return the obd device there.
432 struct obd_device *class_num2obd(int num)
434 struct obd_device *obd = NULL;
436 if (num < class_devno_max()) {
441 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442 "%p obd_magic %08x != %08x\n",
443 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444 LASSERTF(obd->obd_minor == num,
445 "%p obd_minor %0d != %0d\n",
446 obd, obd->obd_minor, num);
452 void class_obd_list(void)
457 spin_lock(&obd_dev_lock);
458 for (i = 0; i < class_devno_max(); i++) {
459 struct obd_device *obd = class_num2obd(i);
462 if (obd->obd_stopping)
464 else if (obd->obd_set_up)
466 else if (obd->obd_attached)
470 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471 i, status, obd->obd_type->typ_name,
472 obd->obd_name, obd->obd_uuid.uuid,
473 atomic_read(&obd->obd_refcount));
475 spin_unlock(&obd_dev_lock);
479 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
480 specified, then only the client with that uuid is returned,
481 otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483 const char * typ_name,
484 struct obd_uuid *grp_uuid)
488 spin_lock(&obd_dev_lock);
489 for (i = 0; i < class_devno_max(); i++) {
490 struct obd_device *obd = class_num2obd(i);
493 if ((strncmp(obd->obd_type->typ_name, typ_name,
494 strlen(typ_name)) == 0)) {
495 if (obd_uuid_equals(tgt_uuid,
496 &obd->u.cli.cl_target_uuid) &&
497 ((grp_uuid)? obd_uuid_equals(grp_uuid,
498 &obd->obd_uuid) : 1)) {
499 spin_unlock(&obd_dev_lock);
504 spin_unlock(&obd_dev_lock);
509 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
510 struct obd_uuid *grp_uuid)
512 struct obd_device *obd;
514 obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
516 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
521 /* Iterate the obd_device list looking devices have grp_uuid. Start
522 searching at *next, and if a device is found, the next index to look
523 at is saved in *next. If next is NULL, then the first matching device
524 will always be returned. */
525 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
531 else if (*next >= 0 && *next < class_devno_max())
536 spin_lock(&obd_dev_lock);
537 for (; i < class_devno_max(); i++) {
538 struct obd_device *obd = class_num2obd(i);
541 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
544 spin_unlock(&obd_dev_lock);
548 spin_unlock(&obd_dev_lock);
554 void obd_cleanup_caches(void)
559 if (obd_device_cachep) {
560 rc = cfs_mem_cache_destroy(obd_device_cachep);
561 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
562 obd_device_cachep = NULL;
565 rc = cfs_mem_cache_destroy(obdo_cachep);
566 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
570 rc = cfs_mem_cache_destroy(import_cachep);
571 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
572 import_cachep = NULL;
575 rc = cfs_mem_cache_destroy(capa_cachep);
576 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
582 int obd_init_caches(void)
586 LASSERT(obd_device_cachep == NULL);
587 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
588 sizeof(struct obd_device),
590 if (!obd_device_cachep)
593 LASSERT(obdo_cachep == NULL);
594 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
599 LASSERT(import_cachep == NULL);
600 import_cachep = cfs_mem_cache_create("ll_import_cache",
601 sizeof(struct obd_import),
606 LASSERT(capa_cachep == NULL);
607 capa_cachep = cfs_mem_cache_create("capa_cache",
608 sizeof(struct obd_capa), 0, 0);
614 obd_cleanup_caches();
619 /* map connection to client */
620 struct obd_export *class_conn2export(struct lustre_handle *conn)
622 struct obd_export *export;
626 CDEBUG(D_CACHE, "looking for null handle\n");
630 if (conn->cookie == -1) { /* this means assign a new connection */
631 CDEBUG(D_CACHE, "want a new connection\n");
635 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
636 export = class_handle2object(conn->cookie);
640 struct obd_device *class_exp2obd(struct obd_export *exp)
647 struct obd_device *class_conn2obd(struct lustre_handle *conn)
649 struct obd_export *export;
650 export = class_conn2export(conn);
652 struct obd_device *obd = export->exp_obd;
653 class_export_put(export);
659 struct obd_import *class_exp2cliimp(struct obd_export *exp)
661 struct obd_device *obd = exp->exp_obd;
664 return obd->u.cli.cl_import;
667 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
669 struct obd_device *obd = class_conn2obd(conn);
672 return obd->u.cli.cl_import;
675 /* Export management functions */
676 static void export_handle_addref(void *export)
678 class_export_get(export);
681 void __class_export_put(struct obd_export *exp)
683 if (atomic_dec_and_test(&exp->exp_refcount)) {
684 LASSERT (list_empty(&exp->exp_obd_chain));
686 CDEBUG(D_IOCTL, "final put %p/%s\n",
687 exp, exp->exp_client_uuid.uuid);
689 spin_lock(&obd_zombie_impexp_lock);
690 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
691 spin_unlock(&obd_zombie_impexp_lock);
693 if (obd_zombie_impexp_notify != NULL)
694 obd_zombie_impexp_notify();
697 EXPORT_SYMBOL(__class_export_put);
699 void class_export_destroy(struct obd_export *exp)
701 struct obd_device *obd = exp->exp_obd;
704 LASSERT (atomic_read(&exp->exp_refcount) == 0);
706 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
707 exp->exp_client_uuid.uuid, obd->obd_name);
709 LASSERT(obd != NULL);
711 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
712 if (exp->exp_connection)
713 ptlrpc_put_connection_superhack(exp->exp_connection);
715 LASSERT(list_empty(&exp->exp_outstanding_replies));
716 LASSERT(list_empty(&exp->exp_req_replay_queue));
717 obd_destroy_export(exp);
719 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
724 /* Creates a new export, adds it to the hash table, and returns a
725 * pointer to it. The refcount is 2: one for the hash reference, and
726 * one for the pointer returned by this function. */
727 struct obd_export *class_new_export(struct obd_device *obd,
728 struct obd_uuid *cluuid)
730 struct obd_export *export;
733 OBD_ALLOC_PTR(export);
735 return ERR_PTR(-ENOMEM);
737 export->exp_conn_cnt = 0;
738 atomic_set(&export->exp_refcount, 2);
739 atomic_set(&export->exp_rpc_count, 0);
740 export->exp_obd = obd;
741 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
742 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
743 /* XXX this should be in LDLM init */
744 CFS_INIT_LIST_HEAD(&export->exp_ldlm_data.led_held_locks);
745 spin_lock_init(&export->exp_ldlm_data.led_lock);
747 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
748 class_handle_hash(&export->exp_handle, export_handle_addref);
749 export->exp_last_request_time = cfs_time_current_sec();
750 spin_lock_init(&export->exp_lock);
751 INIT_HLIST_NODE(&export->exp_uuid_hash);
752 INIT_HLIST_NODE(&export->exp_nid_hash);
754 export->exp_sp_peer = LUSTRE_SP_ANY;
755 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
756 export->exp_client_uuid = *cluuid;
757 obd_init_export(export);
759 spin_lock(&obd->obd_dev_lock);
760 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
761 rc = lustre_hash_additem_unique(obd->obd_uuid_hash_body, cluuid,
762 &export->exp_uuid_hash);
764 CWARN("%s: denying duplicate export for %s\n",
765 obd->obd_name, cluuid->uuid);
766 spin_unlock(&obd->obd_dev_lock);
767 class_handle_unhash(&export->exp_handle);
768 OBD_FREE_PTR(export);
769 return ERR_PTR(-EALREADY);
773 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
775 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
776 list_add_tail(&export->exp_obd_chain_timed,
777 &export->exp_obd->obd_exports_timed);
778 export->exp_obd->obd_num_exports++;
779 spin_unlock(&obd->obd_dev_lock);
783 EXPORT_SYMBOL(class_new_export);
785 void class_unlink_export(struct obd_export *exp)
787 class_handle_unhash(&exp->exp_handle);
789 spin_lock(&exp->exp_obd->obd_dev_lock);
790 /* delete an uuid-export hashitem from hashtables */
791 if (!hlist_unhashed(&exp->exp_uuid_hash)) {
792 lustre_hash_delitem(exp->exp_obd->obd_uuid_hash_body,
793 &exp->exp_client_uuid, &exp->exp_uuid_hash);
795 list_del_init(&exp->exp_obd_chain);
796 list_del_init(&exp->exp_obd_chain_timed);
797 exp->exp_obd->obd_num_exports--;
798 spin_unlock(&exp->exp_obd->obd_dev_lock);
800 class_export_put(exp);
802 EXPORT_SYMBOL(class_unlink_export);
804 /* Import management functions */
805 static void import_handle_addref(void *import)
807 class_import_get(import);
810 struct obd_import *class_import_get(struct obd_import *import)
812 LASSERT(atomic_read(&import->imp_refcount) >= 0);
813 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
814 atomic_inc(&import->imp_refcount);
815 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
816 atomic_read(&import->imp_refcount));
819 EXPORT_SYMBOL(class_import_get);
821 void class_import_put(struct obd_import *import)
825 LASSERT(atomic_read(&import->imp_refcount) > 0);
826 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
827 LASSERT(list_empty(&import->imp_zombie_chain));
829 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
830 atomic_read(&import->imp_refcount) - 1);
832 if (atomic_dec_and_test(&import->imp_refcount)) {
834 CDEBUG(D_INFO, "final put import %p\n", import);
836 spin_lock(&obd_zombie_impexp_lock);
837 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
838 spin_unlock(&obd_zombie_impexp_lock);
840 if (obd_zombie_impexp_notify != NULL)
841 obd_zombie_impexp_notify();
846 EXPORT_SYMBOL(class_import_put);
848 void class_import_destroy(struct obd_import *import)
852 CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
853 import->imp_obd->obd_name);
855 LASSERT(atomic_read(&import->imp_refcount) == 0);
857 ptlrpc_put_connection_superhack(import->imp_connection);
859 while (!list_empty(&import->imp_conn_list)) {
860 struct obd_import_conn *imp_conn;
862 imp_conn = list_entry(import->imp_conn_list.next,
863 struct obd_import_conn, oic_item);
864 list_del(&imp_conn->oic_item);
865 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
866 OBD_FREE(imp_conn, sizeof(*imp_conn));
869 LASSERT(import->imp_sec == NULL);
870 class_decref(import->imp_obd);
871 OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
875 static void init_imp_at(struct imp_at *at) {
877 at_init(&at->iat_net_latency, 0, 0);
878 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
879 /* max service estimates are tracked on the server side, so
880 don't use the AT history here, just use the last reported
881 val. (But keep hist for proc histogram, worst_ever) */
882 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
887 struct obd_import *class_new_import(struct obd_device *obd)
889 struct obd_import *imp;
891 OBD_ALLOC(imp, sizeof(*imp));
895 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
896 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
897 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
898 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
899 spin_lock_init(&imp->imp_lock);
900 imp->imp_last_success_conn = 0;
901 imp->imp_state = LUSTRE_IMP_NEW;
902 imp->imp_obd = class_incref(obd);
903 sema_init(&imp->imp_sec_mutex, 1);
904 cfs_waitq_init(&imp->imp_recovery_waitq);
906 atomic_set(&imp->imp_refcount, 2);
907 atomic_set(&imp->imp_inflight, 0);
908 atomic_set(&imp->imp_replay_inflight, 0);
909 atomic_set(&imp->imp_inval_count, 0);
910 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
911 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
912 class_handle_hash(&imp->imp_handle, import_handle_addref);
913 init_imp_at(&imp->imp_at);
915 /* the default magic is V2, will be used in connect RPC, and
916 * then adjusted according to the flags in request/reply. */
917 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
921 EXPORT_SYMBOL(class_new_import);
923 void class_destroy_import(struct obd_import *import)
925 LASSERT(import != NULL);
926 LASSERT(import != LP_POISON);
928 class_handle_unhash(&import->imp_handle);
930 spin_lock(&import->imp_lock);
931 import->imp_generation++;
932 spin_unlock(&import->imp_lock);
933 class_import_put(import);
935 EXPORT_SYMBOL(class_destroy_import);
937 /* A connection defines an export context in which preallocation can
938 be managed. This releases the export pointer reference, and returns
939 the export handle, so the export refcount is 1 when this function
941 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
942 struct obd_uuid *cluuid)
944 struct obd_export *export;
945 LASSERT(conn != NULL);
946 LASSERT(obd != NULL);
947 LASSERT(cluuid != NULL);
950 export = class_new_export(obd, cluuid);
952 RETURN(PTR_ERR(export));
954 conn->cookie = export->exp_handle.h_cookie;
955 class_export_put(export);
957 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
958 cluuid->uuid, conn->cookie);
961 EXPORT_SYMBOL(class_connect);
963 /* if export is involved in recovery then clean up related things */
964 void class_export_recovery_cleanup(struct obd_export *exp)
966 struct obd_device *obd = exp->exp_obd;
968 spin_lock_bh(&obd->obd_processing_task_lock);
969 if (obd->obd_recovering && exp->exp_in_recovery) {
970 spin_lock(&exp->exp_lock);
971 exp->exp_in_recovery = 0;
972 spin_unlock(&exp->exp_lock);
973 obd->obd_connected_clients--;
974 /* each connected client is counted as recoverable */
975 obd->obd_recoverable_clients--;
976 if (exp->exp_req_replay_needed) {
977 spin_lock(&exp->exp_lock);
978 exp->exp_req_replay_needed = 0;
979 spin_unlock(&exp->exp_lock);
980 LASSERT(atomic_read(&obd->obd_req_replay_clients));
981 atomic_dec(&obd->obd_req_replay_clients);
983 if (exp->exp_lock_replay_needed) {
984 spin_lock(&exp->exp_lock);
985 exp->exp_lock_replay_needed = 0;
986 spin_unlock(&exp->exp_lock);
987 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
988 atomic_dec(&obd->obd_lock_replay_clients);
991 spin_unlock_bh(&obd->obd_processing_task_lock);
994 /* This function removes two references from the export: one for the
995 * hash entry and one for the export pointer passed in. The export
996 * pointer passed to this function is destroyed should not be used
998 int class_disconnect(struct obd_export *export)
1000 int already_disconnected;
1003 if (export == NULL) {
1005 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1009 spin_lock(&export->exp_lock);
1010 already_disconnected = export->exp_disconnected;
1011 export->exp_disconnected = 1;
1013 if (!hlist_unhashed(&export->exp_nid_hash)) {
1014 lustre_hash_delitem(export->exp_obd->obd_nid_hash_body,
1015 &export->exp_connection->c_peer.nid, &export->exp_nid_hash);
1017 spin_unlock(&export->exp_lock);
1019 /* class_cleanup(), abort_recovery(), and class_fail_export()
1020 * all end up in here, and if any of them race we shouldn't
1021 * call extra class_export_puts(). */
1022 if (already_disconnected)
1025 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1026 export->exp_handle.h_cookie);
1028 class_export_recovery_cleanup(export);
1029 class_unlink_export(export);
1030 class_export_put(export);
1034 static void class_disconnect_export_list(struct list_head *list, int flags)
1037 struct lustre_handle fake_conn;
1038 struct obd_export *fake_exp, *exp;
1041 /* It's possible that an export may disconnect itself, but
1042 * nothing else will be added to this list. */
1043 while (!list_empty(list)) {
1044 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1045 class_export_get(exp);
1047 spin_lock(&exp->exp_lock);
1048 exp->exp_flags = flags;
1049 spin_unlock(&exp->exp_lock);
1051 if (obd_uuid_equals(&exp->exp_client_uuid,
1052 &exp->exp_obd->obd_uuid)) {
1054 "exp %p export uuid == obd uuid, don't discon\n",
1056 /* Need to delete this now so we don't end up pointing
1057 * to work_list later when this export is cleaned up. */
1058 list_del_init(&exp->exp_obd_chain);
1059 class_export_put(exp);
1063 fake_conn.cookie = exp->exp_handle.h_cookie;
1064 fake_exp = class_conn2export(&fake_conn);
1066 class_export_put(exp);
1070 spin_lock(&fake_exp->exp_lock);
1071 fake_exp->exp_flags = flags;
1072 spin_unlock(&fake_exp->exp_lock);
1074 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1075 "last request at %ld\n",
1076 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1077 exp, exp->exp_last_request_time);
1078 rc = obd_disconnect(fake_exp);
1079 class_export_put(exp);
1084 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1086 return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1087 (obd->obd_force ? OBD_OPT_FORCE : 0));
1090 void class_disconnect_exports(struct obd_device *obd)
1092 struct list_head work_list;
1095 /* Move all of the exports from obd_exports to a work list, en masse. */
1096 spin_lock(&obd->obd_dev_lock);
1097 list_add(&work_list, &obd->obd_exports);
1098 list_del_init(&obd->obd_exports);
1099 spin_unlock(&obd->obd_dev_lock);
1101 if (!list_empty(&work_list)) {
1102 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1103 "disconnecting them\n", obd->obd_minor, obd);
1104 class_disconnect_export_list(&work_list,
1105 get_exp_flags_from_obd(obd));
1107 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1108 obd->obd_minor, obd);
1111 EXPORT_SYMBOL(class_disconnect_exports);
1113 /* Remove exports that have not completed recovery.
1115 int class_disconnect_stale_exports(struct obd_device *obd,
1116 int (*test_export)(struct obd_export *))
1118 struct list_head work_list;
1119 struct list_head *pos, *n;
1120 struct obd_export *exp;
1124 CFS_INIT_LIST_HEAD(&work_list);
1125 spin_lock(&obd->obd_dev_lock);
1126 list_for_each_safe(pos, n, &obd->obd_exports) {
1127 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1128 if (test_export(exp))
1131 list_del(&exp->exp_obd_chain);
1132 list_add(&exp->exp_obd_chain, &work_list);
1133 /* don't count self-export as client */
1134 if (obd_uuid_equals(&exp->exp_client_uuid,
1135 &exp->exp_obd->obd_uuid))
1139 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1140 obd->obd_name, exp->exp_client_uuid.uuid,
1141 exp->exp_connection == NULL ? "<unknown>" :
1142 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1144 spin_unlock(&obd->obd_dev_lock);
1146 CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1147 obd->obd_name, cnt);
1148 class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1151 EXPORT_SYMBOL(class_disconnect_stale_exports);
1153 int oig_init(struct obd_io_group **oig_out)
1155 struct obd_io_group *oig;
1158 OBD_ALLOC(oig, sizeof(*oig));
1162 spin_lock_init(&oig->oig_lock);
1164 oig->oig_pending = 0;
1165 atomic_set(&oig->oig_refcount, 1);
1166 cfs_waitq_init(&oig->oig_waitq);
1167 CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1172 EXPORT_SYMBOL(oig_init);
1174 static inline void oig_grab(struct obd_io_group *oig)
1176 atomic_inc(&oig->oig_refcount);
1179 void oig_release(struct obd_io_group *oig)
1181 if (atomic_dec_and_test(&oig->oig_refcount))
1182 OBD_FREE(oig, sizeof(*oig));
1184 EXPORT_SYMBOL(oig_release);
1186 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1189 CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1190 spin_lock(&oig->oig_lock);
1196 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1198 spin_unlock(&oig->oig_lock);
1203 EXPORT_SYMBOL(oig_add_one);
1205 void oig_complete_one(struct obd_io_group *oig,
1206 struct oig_callback_context *occ, int rc)
1208 cfs_waitq_t *wake = NULL;
1211 spin_lock(&oig->oig_lock);
1214 list_del_init(&occ->occ_oig_item);
1216 old_rc = oig->oig_rc;
1217 if (oig->oig_rc == 0 && rc != 0)
1220 if (--oig->oig_pending <= 0)
1221 wake = &oig->oig_waitq;
1223 spin_unlock(&oig->oig_lock);
1225 CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1226 "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1229 cfs_waitq_signal(wake);
1232 EXPORT_SYMBOL(oig_complete_one);
1234 static int oig_done(struct obd_io_group *oig)
1237 spin_lock(&oig->oig_lock);
1238 if (oig->oig_pending <= 0)
1240 spin_unlock(&oig->oig_lock);
1244 static void interrupted_oig(void *data)
1246 struct obd_io_group *oig = data;
1247 struct oig_callback_context *occ;
1249 spin_lock(&oig->oig_lock);
1250 /* We need to restart the processing each time we drop the lock, as
1251 * it is possible other threads called oig_complete_one() to remove
1252 * an entry elsewhere in the list while we dropped lock. We need to
1253 * drop the lock because osc_ap_completion() calls oig_complete_one()
1254 * which re-gets this lock ;-) as well as a lock ordering issue. */
1256 list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1257 if (occ->interrupted)
1259 occ->interrupted = 1;
1260 spin_unlock(&oig->oig_lock);
1261 occ->occ_interrupted(occ);
1262 spin_lock(&oig->oig_lock);
1265 spin_unlock(&oig->oig_lock);
1268 int oig_wait(struct obd_io_group *oig)
1270 struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1273 CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1276 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1277 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1278 /* we can't continue until the oig has emptied and stopped
1279 * referencing state that the caller will free upon return */
1281 lwi = (struct l_wait_info){ 0, };
1282 } while (rc == -EINTR);
1284 LASSERTF(oig->oig_pending == 0,
1285 "exiting oig_wait(oig = %p) with %d pending\n", oig,
1288 CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1291 EXPORT_SYMBOL(oig_wait);
1293 void class_fail_export(struct obd_export *exp)
1295 int rc, already_failed;
1297 spin_lock(&exp->exp_lock);
1298 already_failed = exp->exp_failed;
1299 exp->exp_failed = 1;
1300 spin_unlock(&exp->exp_lock);
1302 if (already_failed) {
1303 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1304 exp, exp->exp_client_uuid.uuid);
1308 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1309 exp, exp->exp_client_uuid.uuid);
1311 if (obd_dump_on_timeout)
1312 libcfs_debug_dumplog();
1314 /* Most callers into obd_disconnect are removing their own reference
1315 * (request, for example) in addition to the one from the hash table.
1316 * We don't have such a reference here, so make one. */
1317 class_export_get(exp);
1318 rc = obd_disconnect(exp);
1320 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1322 CDEBUG(D_HA, "disconnected export %p/%s\n",
1323 exp, exp->exp_client_uuid.uuid);
1325 EXPORT_SYMBOL(class_fail_export);
1327 char *obd_export_nid2str(struct obd_export *exp)
1329 if (exp->exp_connection != NULL)
1330 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1334 EXPORT_SYMBOL(obd_export_nid2str);
1336 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1338 struct obd_export *doomed_exp = NULL;
1339 int exports_evicted = 0;
1341 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1344 doomed_exp = lustre_hash_get_object_by_key(obd->obd_nid_hash_body,
1346 if (doomed_exp == NULL)
1349 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1350 "nid %s found, wanted nid %s, requested nid %s\n",
1351 obd_export_nid2str(doomed_exp),
1352 libcfs_nid2str(nid_key), nid);
1353 LASSERTF(doomed_exp != obd->obd_self_export,
1354 "self-export is hashed by NID?\n");
1356 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1357 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1359 class_fail_export(doomed_exp);
1360 class_export_put(doomed_exp);
1363 if (!exports_evicted)
1364 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1365 obd->obd_name, nid);
1366 return exports_evicted;
1368 EXPORT_SYMBOL(obd_export_evict_by_nid);
1370 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1372 struct obd_export *doomed_exp = NULL;
1373 struct obd_uuid doomed;
1374 int exports_evicted = 0;
1376 obd_str2uuid(&doomed, uuid);
1377 if (obd_uuid_equals(&doomed, &obd->obd_uuid)) {
1378 CERROR("%s: can't evict myself\n", obd->obd_name);
1379 return exports_evicted;
1382 doomed_exp = lustre_hash_get_object_by_key(obd->obd_uuid_hash_body,
1385 if (doomed_exp == NULL) {
1386 CERROR("%s: can't disconnect %s: no exports found\n",
1387 obd->obd_name, uuid);
1389 CWARN("%s: evicting %s at adminstrative request\n",
1390 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1391 class_fail_export(doomed_exp);
1392 class_export_put(doomed_exp);
1396 return exports_evicted;
1398 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1401 * kill zombie imports and exports
1403 void obd_zombie_impexp_cull(void)
1405 struct obd_import *import;
1406 struct obd_export *export;
1410 spin_lock (&obd_zombie_impexp_lock);
1413 if (!list_empty(&obd_zombie_imports)) {
1414 import = list_entry(obd_zombie_imports.next,
1417 list_del(&import->imp_zombie_chain);
1421 if (!list_empty(&obd_zombie_exports)) {
1422 export = list_entry(obd_zombie_exports.next,
1425 list_del_init(&export->exp_obd_chain);
1428 spin_unlock(&obd_zombie_impexp_lock);
1431 class_import_destroy(import);
1434 class_export_destroy(export);
1436 } while (import != NULL || export != NULL);
1440 static struct completion obd_zombie_start;
1441 static struct completion obd_zombie_stop;
1442 static unsigned long obd_zombie_flags;
1443 static cfs_waitq_t obd_zombie_waitq;
1450 * check for work for kill zombie import/export thread.
1452 int obd_zombie_impexp_check(void *arg)
1456 spin_lock(&obd_zombie_impexp_lock);
1457 rc = list_empty(&obd_zombie_imports) &&
1458 list_empty(&obd_zombie_exports) &&
1459 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1461 spin_unlock(&obd_zombie_impexp_lock);
1467 * notify import/export destroy thread about new zombie.
1469 static void obd_zombie_impexp_notify(void)
1471 cfs_waitq_signal(&obd_zombie_waitq);
1477 * destroy zombie export/import thread.
1479 static int obd_zombie_impexp_thread(void *unused)
1483 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1484 complete(&obd_zombie_start);
1488 complete(&obd_zombie_start);
1490 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1491 struct l_wait_info lwi = { 0 };
1493 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1495 obd_zombie_impexp_cull();
1498 complete(&obd_zombie_stop);
1503 #else /* ! KERNEL */
1505 static atomic_t zombie_recur = ATOMIC_INIT(0);
1506 static void *obd_zombie_impexp_work_cb;
1507 static void *obd_zombie_impexp_idle_cb;
1509 int obd_zombie_impexp_kill(void *arg)
1513 if (atomic_inc_return(&zombie_recur) == 1) {
1514 obd_zombie_impexp_cull();
1517 atomic_dec(&zombie_recur);
1524 * start destroy zombie import/export thread
1526 int obd_zombie_impexp_init(void)
1530 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1531 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1532 spin_lock_init(&obd_zombie_impexp_lock);
1533 init_completion(&obd_zombie_start);
1534 init_completion(&obd_zombie_stop);
1535 cfs_waitq_init(&obd_zombie_waitq);
1538 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1542 wait_for_completion(&obd_zombie_start);
1545 obd_zombie_impexp_work_cb =
1546 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1547 &obd_zombie_impexp_kill, NULL);
1549 obd_zombie_impexp_idle_cb =
1550 liblustre_register_idle_callback("obd_zombi_impexp_check",
1551 &obd_zombie_impexp_check, NULL);
1558 * stop destroy zombie import/export thread
1560 void obd_zombie_impexp_stop(void)
1562 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1563 obd_zombie_impexp_notify();
1565 wait_for_completion(&obd_zombie_stop);
1567 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1568 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);