1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
70 static struct obd_device *obd_device_alloc(void)
72 struct obd_device *obd;
74 OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
76 obd->obd_magic = OBD_DEVICE_MAGIC;
80 EXPORT_SYMBOL(obd_device_alloc);
82 static void obd_device_free(struct obd_device *obd)
85 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87 if (obd->obd_namespace != NULL) {
88 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89 obd, obd->obd_namespace, obd->obd_force);
92 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
94 EXPORT_SYMBOL(obd_device_free);
96 struct obd_type *class_search_type(const char *name)
98 struct list_head *tmp;
99 struct obd_type *type;
101 spin_lock(&obd_types_lock);
102 list_for_each(tmp, &obd_types) {
103 type = list_entry(tmp, struct obd_type, typ_chain);
104 if (strcmp(type->typ_name, name) == 0) {
105 spin_unlock(&obd_types_lock);
109 spin_unlock(&obd_types_lock);
113 struct obd_type *class_get_type(const char *name)
115 struct obd_type *type = class_search_type(name);
119 const char *modname = name;
120 if (!request_module(modname)) {
121 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122 type = class_search_type(name);
124 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130 spin_lock(&type->obd_type_lock);
132 try_module_get(type->typ_dt_ops->o_owner);
133 spin_unlock(&type->obd_type_lock);
138 void class_put_type(struct obd_type *type)
141 spin_lock(&type->obd_type_lock);
143 module_put(type->typ_dt_ops->o_owner);
144 spin_unlock(&type->obd_type_lock);
147 #define CLASS_MAX_NAME 1024
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150 struct lprocfs_vars *vars, const char *name,
151 struct lu_device_type *ldt)
153 struct obd_type *type;
158 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
160 if (class_search_type(name)) {
161 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166 OBD_ALLOC(type, sizeof(*type));
170 OBD_ALLOC_PTR(type->typ_dt_ops);
171 OBD_ALLOC_PTR(type->typ_md_ops);
172 OBD_ALLOC(type->typ_name, strlen(name) + 1);
174 if (type->typ_dt_ops == NULL ||
175 type->typ_md_ops == NULL ||
176 type->typ_name == NULL)
179 *(type->typ_dt_ops) = *dt_ops;
180 /* md_ops is optional */
182 *(type->typ_md_ops) = *md_ops;
183 strcpy(type->typ_name, name);
184 spin_lock_init(&type->obd_type_lock);
187 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
189 if (IS_ERR(type->typ_procroot)) {
190 rc = PTR_ERR(type->typ_procroot);
191 type->typ_procroot = NULL;
197 rc = ldt->ldt_ops->ldto_init(ldt);
202 spin_lock(&obd_types_lock);
203 list_add(&type->typ_chain, &obd_types);
204 spin_unlock(&obd_types_lock);
209 if (type->typ_name != NULL)
210 OBD_FREE(type->typ_name, strlen(name) + 1);
211 if (type->typ_md_ops != NULL)
212 OBD_FREE_PTR(type->typ_md_ops);
213 if (type->typ_dt_ops != NULL)
214 OBD_FREE_PTR(type->typ_dt_ops);
215 OBD_FREE(type, sizeof(*type));
219 int class_unregister_type(const char *name)
221 struct obd_type *type = class_search_type(name);
225 CERROR("unknown obd type\n");
229 if (type->typ_refcnt) {
230 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231 /* This is a bad situation, let's make the best of it */
232 /* Remove ops, but leave the name for debugging */
233 OBD_FREE_PTR(type->typ_dt_ops);
234 OBD_FREE_PTR(type->typ_md_ops);
238 if (type->typ_procroot) {
239 lprocfs_remove(&type->typ_procroot);
243 type->typ_lu->ldt_ops->ldto_fini(type->typ_lu);
245 spin_lock(&obd_types_lock);
246 list_del(&type->typ_chain);
247 spin_unlock(&obd_types_lock);
248 OBD_FREE(type->typ_name, strlen(name) + 1);
249 if (type->typ_dt_ops != NULL)
250 OBD_FREE_PTR(type->typ_dt_ops);
251 if (type->typ_md_ops != NULL)
252 OBD_FREE_PTR(type->typ_md_ops);
253 OBD_FREE(type, sizeof(*type));
255 } /* class_unregister_type */
258 * Create a new obd device.
260 * Find an empty slot in ::obd_devs[], create a new obd device in it.
262 * \param typename [in] obd device type string.
263 * \param name [in] obd device name.
265 * \retval NULL if create fails, otherwise return the obd device
268 struct obd_device *class_newdev(const char *type_name, const char *name)
270 struct obd_device *result = NULL;
271 struct obd_device *newdev;
272 struct obd_type *type = NULL;
274 int new_obd_minor = 0;
276 if (strlen(name) >= MAX_OBD_NAME) {
277 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278 RETURN(ERR_PTR(-EINVAL));
281 type = class_get_type(type_name);
283 CERROR("OBD: unknown type: %s\n", type_name);
284 RETURN(ERR_PTR(-ENODEV));
287 newdev = obd_device_alloc();
288 if (newdev == NULL) {
289 class_put_type(type);
290 RETURN(ERR_PTR(-ENOMEM));
292 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
294 spin_lock(&obd_dev_lock);
295 for (i = 0; i < class_devno_max(); i++) {
296 struct obd_device *obd = class_num2obd(i);
297 if (obd && obd->obd_name &&
298 (strcmp(name, obd->obd_name) == 0)) {
299 CERROR("Device %s already exists, won't add\n", name);
301 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302 "%p obd_magic %08x != %08x\n", result,
303 result->obd_magic, OBD_DEVICE_MAGIC);
304 LASSERTF(result->obd_minor == new_obd_minor,
305 "%p obd_minor %d != %d\n", result,
306 result->obd_minor, new_obd_minor);
308 obd_devs[result->obd_minor] = NULL;
309 result->obd_name[0]='\0';
311 result = ERR_PTR(-EEXIST);
314 if (!result && !obd) {
316 result->obd_minor = i;
318 result->obd_type = type;
319 strncpy(result->obd_name, name,
320 sizeof(result->obd_name) - 1);
321 obd_devs[i] = result;
324 spin_unlock(&obd_dev_lock);
326 if (result == NULL && i >= class_devno_max()) {
327 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
329 result = ERR_PTR(-EOVERFLOW);
332 if (IS_ERR(result)) {
333 obd_device_free(newdev);
334 class_put_type(type);
336 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337 result->obd_name, result);
342 void class_release_dev(struct obd_device *obd)
344 struct obd_type *obd_type = obd->obd_type;
346 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350 LASSERT(obd_type != NULL);
352 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353 obd->obd_name,obd->obd_type->typ_name);
355 spin_lock(&obd_dev_lock);
356 obd_devs[obd->obd_minor] = NULL;
357 spin_unlock(&obd_dev_lock);
358 obd_device_free(obd);
360 class_put_type(obd_type);
363 int class_name2dev(const char *name)
370 spin_lock(&obd_dev_lock);
371 for (i = 0; i < class_devno_max(); i++) {
372 struct obd_device *obd = class_num2obd(i);
373 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374 /* Make sure we finished attaching before we give
375 out any references */
376 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377 if (obd->obd_attached) {
378 spin_unlock(&obd_dev_lock);
384 spin_unlock(&obd_dev_lock);
389 struct obd_device *class_name2obd(const char *name)
391 int dev = class_name2dev(name);
393 if (dev < 0 || dev > class_devno_max())
395 return class_num2obd(dev);
398 int class_uuid2dev(struct obd_uuid *uuid)
402 spin_lock(&obd_dev_lock);
403 for (i = 0; i < class_devno_max(); i++) {
404 struct obd_device *obd = class_num2obd(i);
405 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407 spin_unlock(&obd_dev_lock);
411 spin_unlock(&obd_dev_lock);
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
418 int dev = class_uuid2dev(uuid);
421 return class_num2obd(dev);
425 * Get obd device from ::obd_devs[]
427 * \param num [in] array index
429 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430 * otherwise return the obd device there.
432 struct obd_device *class_num2obd(int num)
434 struct obd_device *obd = NULL;
436 if (num < class_devno_max()) {
441 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442 "%p obd_magic %08x != %08x\n",
443 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444 LASSERTF(obd->obd_minor == num,
445 "%p obd_minor %0d != %0d\n",
446 obd, obd->obd_minor, num);
452 void class_obd_list(void)
457 spin_lock(&obd_dev_lock);
458 for (i = 0; i < class_devno_max(); i++) {
459 struct obd_device *obd = class_num2obd(i);
462 if (obd->obd_stopping)
464 else if (obd->obd_set_up)
466 else if (obd->obd_attached)
470 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471 i, status, obd->obd_type->typ_name,
472 obd->obd_name, obd->obd_uuid.uuid,
473 atomic_read(&obd->obd_refcount));
475 spin_unlock(&obd_dev_lock);
479 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
480 specified, then only the client with that uuid is returned,
481 otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483 const char * typ_name,
484 struct obd_uuid *grp_uuid)
488 spin_lock(&obd_dev_lock);
489 for (i = 0; i < class_devno_max(); i++) {
490 struct obd_device *obd = class_num2obd(i);
493 if ((strncmp(obd->obd_type->typ_name, typ_name,
494 strlen(typ_name)) == 0)) {
495 if (obd_uuid_equals(tgt_uuid,
496 &obd->u.cli.cl_target_uuid) &&
497 ((grp_uuid)? obd_uuid_equals(grp_uuid,
498 &obd->obd_uuid) : 1)) {
499 spin_unlock(&obd_dev_lock);
504 spin_unlock(&obd_dev_lock);
509 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
510 struct obd_uuid *grp_uuid)
512 struct obd_device *obd;
514 obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
516 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
521 /* Iterate the obd_device list looking devices have grp_uuid. Start
522 searching at *next, and if a device is found, the next index to look
523 at is saved in *next. If next is NULL, then the first matching device
524 will always be returned. */
525 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
531 else if (*next >= 0 && *next < class_devno_max())
536 spin_lock(&obd_dev_lock);
537 for (; i < class_devno_max(); i++) {
538 struct obd_device *obd = class_num2obd(i);
541 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
544 spin_unlock(&obd_dev_lock);
548 spin_unlock(&obd_dev_lock);
554 void obd_cleanup_caches(void)
559 if (obd_device_cachep) {
560 rc = cfs_mem_cache_destroy(obd_device_cachep);
561 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
562 obd_device_cachep = NULL;
565 rc = cfs_mem_cache_destroy(obdo_cachep);
566 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
570 rc = cfs_mem_cache_destroy(import_cachep);
571 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
572 import_cachep = NULL;
575 rc = cfs_mem_cache_destroy(capa_cachep);
576 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
582 int obd_init_caches(void)
586 LASSERT(obd_device_cachep == NULL);
587 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
588 sizeof(struct obd_device),
590 if (!obd_device_cachep)
593 LASSERT(obdo_cachep == NULL);
594 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
599 LASSERT(import_cachep == NULL);
600 import_cachep = cfs_mem_cache_create("ll_import_cache",
601 sizeof(struct obd_import),
606 LASSERT(capa_cachep == NULL);
607 capa_cachep = cfs_mem_cache_create("capa_cache",
608 sizeof(struct obd_capa), 0, 0);
614 obd_cleanup_caches();
619 /* map connection to client */
620 struct obd_export *class_conn2export(struct lustre_handle *conn)
622 struct obd_export *export;
626 CDEBUG(D_CACHE, "looking for null handle\n");
630 if (conn->cookie == -1) { /* this means assign a new connection */
631 CDEBUG(D_CACHE, "want a new connection\n");
635 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
636 export = class_handle2object(conn->cookie);
640 struct obd_device *class_exp2obd(struct obd_export *exp)
647 struct obd_device *class_conn2obd(struct lustre_handle *conn)
649 struct obd_export *export;
650 export = class_conn2export(conn);
652 struct obd_device *obd = export->exp_obd;
653 class_export_put(export);
659 struct obd_import *class_exp2cliimp(struct obd_export *exp)
661 struct obd_device *obd = exp->exp_obd;
664 return obd->u.cli.cl_import;
667 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
669 struct obd_device *obd = class_conn2obd(conn);
672 return obd->u.cli.cl_import;
675 /* Export management functions */
676 static void export_handle_addref(void *export)
678 class_export_get(export);
681 void __class_export_put(struct obd_export *exp)
683 if (atomic_dec_and_test(&exp->exp_refcount)) {
684 LASSERT (list_empty(&exp->exp_obd_chain));
686 CDEBUG(D_IOCTL, "final put %p/%s\n",
687 exp, exp->exp_client_uuid.uuid);
689 spin_lock(&obd_zombie_impexp_lock);
690 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
691 spin_unlock(&obd_zombie_impexp_lock);
693 if (obd_zombie_impexp_notify != NULL)
694 obd_zombie_impexp_notify();
697 EXPORT_SYMBOL(__class_export_put);
699 void class_export_destroy(struct obd_export *exp)
701 struct obd_device *obd = exp->exp_obd;
704 LASSERT (atomic_read(&exp->exp_refcount) == 0);
706 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
707 exp->exp_client_uuid.uuid, obd->obd_name);
709 LASSERT(obd != NULL);
711 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
712 if (exp->exp_connection)
713 ptlrpc_put_connection_superhack(exp->exp_connection);
715 LASSERT(list_empty(&exp->exp_outstanding_replies));
716 LASSERT(list_empty(&exp->exp_req_replay_queue));
717 obd_destroy_export(exp);
719 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
724 /* Creates a new export, adds it to the hash table, and returns a
725 * pointer to it. The refcount is 2: one for the hash reference, and
726 * one for the pointer returned by this function. */
727 struct obd_export *class_new_export(struct obd_device *obd,
728 struct obd_uuid *cluuid)
730 struct obd_export *export;
733 OBD_ALLOC_PTR(export);
735 return ERR_PTR(-ENOMEM);
737 export->exp_conn_cnt = 0;
738 export->exp_lock_hash = NULL;
739 atomic_set(&export->exp_refcount, 2);
740 atomic_set(&export->exp_rpc_count, 0);
741 export->exp_obd = obd;
742 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
743 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
744 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
745 class_handle_hash(&export->exp_handle, export_handle_addref);
746 export->exp_last_request_time = cfs_time_current_sec();
747 spin_lock_init(&export->exp_lock);
748 INIT_HLIST_NODE(&export->exp_uuid_hash);
749 INIT_HLIST_NODE(&export->exp_nid_hash);
751 export->exp_sp_peer = LUSTRE_SP_ANY;
752 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
753 export->exp_client_uuid = *cluuid;
754 obd_init_export(export);
756 spin_lock(&obd->obd_dev_lock);
757 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
758 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
759 &export->exp_uuid_hash);
761 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
762 obd->obd_name, cluuid->uuid, rc);
763 spin_unlock(&obd->obd_dev_lock);
764 class_handle_unhash(&export->exp_handle);
765 OBD_FREE_PTR(export);
766 return ERR_PTR(-EALREADY);
770 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
772 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
773 list_add_tail(&export->exp_obd_chain_timed,
774 &export->exp_obd->obd_exports_timed);
775 export->exp_obd->obd_num_exports++;
776 spin_unlock(&obd->obd_dev_lock);
780 EXPORT_SYMBOL(class_new_export);
782 void class_unlink_export(struct obd_export *exp)
784 class_handle_unhash(&exp->exp_handle);
786 spin_lock(&exp->exp_obd->obd_dev_lock);
787 /* delete an uuid-export hashitem from hashtables */
788 if (!hlist_unhashed(&exp->exp_uuid_hash))
789 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
790 &exp->exp_client_uuid,
791 &exp->exp_uuid_hash);
793 list_del_init(&exp->exp_obd_chain);
794 list_del_init(&exp->exp_obd_chain_timed);
795 exp->exp_obd->obd_num_exports--;
796 spin_unlock(&exp->exp_obd->obd_dev_lock);
798 class_export_put(exp);
800 EXPORT_SYMBOL(class_unlink_export);
802 /* Import management functions */
803 static void import_handle_addref(void *import)
805 class_import_get(import);
808 struct obd_import *class_import_get(struct obd_import *import)
810 LASSERT(atomic_read(&import->imp_refcount) >= 0);
811 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
812 atomic_inc(&import->imp_refcount);
813 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
814 atomic_read(&import->imp_refcount));
817 EXPORT_SYMBOL(class_import_get);
819 void class_import_put(struct obd_import *import)
823 LASSERT(atomic_read(&import->imp_refcount) > 0);
824 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
825 LASSERT(list_empty(&import->imp_zombie_chain));
827 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
828 atomic_read(&import->imp_refcount) - 1);
830 if (atomic_dec_and_test(&import->imp_refcount)) {
832 CDEBUG(D_INFO, "final put import %p\n", import);
834 spin_lock(&obd_zombie_impexp_lock);
835 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
836 spin_unlock(&obd_zombie_impexp_lock);
838 if (obd_zombie_impexp_notify != NULL)
839 obd_zombie_impexp_notify();
844 EXPORT_SYMBOL(class_import_put);
846 void class_import_destroy(struct obd_import *import)
850 CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
851 import->imp_obd->obd_name);
853 LASSERT(atomic_read(&import->imp_refcount) == 0);
855 ptlrpc_put_connection_superhack(import->imp_connection);
857 while (!list_empty(&import->imp_conn_list)) {
858 struct obd_import_conn *imp_conn;
860 imp_conn = list_entry(import->imp_conn_list.next,
861 struct obd_import_conn, oic_item);
862 list_del(&imp_conn->oic_item);
863 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
864 OBD_FREE(imp_conn, sizeof(*imp_conn));
867 LASSERT(import->imp_sec == NULL);
868 class_decref(import->imp_obd);
869 OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
873 static void init_imp_at(struct imp_at *at) {
875 at_init(&at->iat_net_latency, 0, 0);
876 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
877 /* max service estimates are tracked on the server side, so
878 don't use the AT history here, just use the last reported
879 val. (But keep hist for proc histogram, worst_ever) */
880 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
885 struct obd_import *class_new_import(struct obd_device *obd)
887 struct obd_import *imp;
889 OBD_ALLOC(imp, sizeof(*imp));
893 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
894 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
895 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
896 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
897 spin_lock_init(&imp->imp_lock);
898 imp->imp_last_success_conn = 0;
899 imp->imp_state = LUSTRE_IMP_NEW;
900 imp->imp_obd = class_incref(obd);
901 sema_init(&imp->imp_sec_mutex, 1);
902 cfs_waitq_init(&imp->imp_recovery_waitq);
904 atomic_set(&imp->imp_refcount, 2);
905 atomic_set(&imp->imp_inflight, 0);
906 atomic_set(&imp->imp_replay_inflight, 0);
907 atomic_set(&imp->imp_inval_count, 0);
908 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
909 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
910 class_handle_hash(&imp->imp_handle, import_handle_addref);
911 init_imp_at(&imp->imp_at);
913 /* the default magic is V2, will be used in connect RPC, and
914 * then adjusted according to the flags in request/reply. */
915 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
919 EXPORT_SYMBOL(class_new_import);
921 void class_destroy_import(struct obd_import *import)
923 LASSERT(import != NULL);
924 LASSERT(import != LP_POISON);
926 class_handle_unhash(&import->imp_handle);
928 spin_lock(&import->imp_lock);
929 import->imp_generation++;
930 spin_unlock(&import->imp_lock);
931 class_import_put(import);
933 EXPORT_SYMBOL(class_destroy_import);
935 /* A connection defines an export context in which preallocation can
936 be managed. This releases the export pointer reference, and returns
937 the export handle, so the export refcount is 1 when this function
939 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
940 struct obd_uuid *cluuid)
942 struct obd_export *export;
943 LASSERT(conn != NULL);
944 LASSERT(obd != NULL);
945 LASSERT(cluuid != NULL);
948 export = class_new_export(obd, cluuid);
950 RETURN(PTR_ERR(export));
952 conn->cookie = export->exp_handle.h_cookie;
953 class_export_put(export);
955 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
956 cluuid->uuid, conn->cookie);
959 EXPORT_SYMBOL(class_connect);
961 /* if export is involved in recovery then clean up related things */
962 void class_export_recovery_cleanup(struct obd_export *exp)
964 struct obd_device *obd = exp->exp_obd;
966 spin_lock_bh(&obd->obd_processing_task_lock);
967 if (obd->obd_recovering && exp->exp_in_recovery) {
968 spin_lock(&exp->exp_lock);
969 exp->exp_in_recovery = 0;
970 spin_unlock(&exp->exp_lock);
971 obd->obd_connected_clients--;
972 /* each connected client is counted as recoverable */
973 obd->obd_recoverable_clients--;
974 if (exp->exp_req_replay_needed) {
975 spin_lock(&exp->exp_lock);
976 exp->exp_req_replay_needed = 0;
977 spin_unlock(&exp->exp_lock);
978 LASSERT(atomic_read(&obd->obd_req_replay_clients));
979 atomic_dec(&obd->obd_req_replay_clients);
981 if (exp->exp_lock_replay_needed) {
982 spin_lock(&exp->exp_lock);
983 exp->exp_lock_replay_needed = 0;
984 spin_unlock(&exp->exp_lock);
985 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
986 atomic_dec(&obd->obd_lock_replay_clients);
989 spin_unlock_bh(&obd->obd_processing_task_lock);
992 /* This function removes two references from the export: one for the
993 * hash entry and one for the export pointer passed in. The export
994 * pointer passed to this function is destroyed should not be used
996 int class_disconnect(struct obd_export *export)
998 int already_disconnected;
1001 if (export == NULL) {
1003 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1007 spin_lock(&export->exp_lock);
1008 already_disconnected = export->exp_disconnected;
1009 export->exp_disconnected = 1;
1011 if (!hlist_unhashed(&export->exp_nid_hash))
1012 lustre_hash_del(export->exp_obd->obd_nid_hash,
1013 &export->exp_connection->c_peer.nid,
1014 &export->exp_nid_hash);
1016 spin_unlock(&export->exp_lock);
1018 /* class_cleanup(), abort_recovery(), and class_fail_export()
1019 * all end up in here, and if any of them race we shouldn't
1020 * call extra class_export_puts(). */
1021 if (already_disconnected)
1024 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1025 export->exp_handle.h_cookie);
1027 class_export_recovery_cleanup(export);
1028 class_unlink_export(export);
1029 class_export_put(export);
1033 static void class_disconnect_export_list(struct list_head *list, int flags)
1036 struct lustre_handle fake_conn;
1037 struct obd_export *fake_exp, *exp;
1040 /* It's possible that an export may disconnect itself, but
1041 * nothing else will be added to this list. */
1042 while (!list_empty(list)) {
1043 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1044 class_export_get(exp);
1046 spin_lock(&exp->exp_lock);
1047 exp->exp_flags = flags;
1048 spin_unlock(&exp->exp_lock);
1050 if (obd_uuid_equals(&exp->exp_client_uuid,
1051 &exp->exp_obd->obd_uuid)) {
1053 "exp %p export uuid == obd uuid, don't discon\n",
1055 /* Need to delete this now so we don't end up pointing
1056 * to work_list later when this export is cleaned up. */
1057 list_del_init(&exp->exp_obd_chain);
1058 class_export_put(exp);
1062 fake_conn.cookie = exp->exp_handle.h_cookie;
1063 fake_exp = class_conn2export(&fake_conn);
1065 class_export_put(exp);
1069 spin_lock(&fake_exp->exp_lock);
1070 fake_exp->exp_flags = flags;
1071 spin_unlock(&fake_exp->exp_lock);
1073 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1074 "last request at "CFS_TIME_T"\n",
1075 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1076 exp, exp->exp_last_request_time);
1077 rc = obd_disconnect(fake_exp);
1078 class_export_put(exp);
1083 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1085 return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1086 (obd->obd_force ? OBD_OPT_FORCE : 0));
1089 void class_disconnect_exports(struct obd_device *obd)
1091 struct list_head work_list;
1094 /* Move all of the exports from obd_exports to a work list, en masse. */
1095 spin_lock(&obd->obd_dev_lock);
1096 list_add(&work_list, &obd->obd_exports);
1097 list_del_init(&obd->obd_exports);
1098 spin_unlock(&obd->obd_dev_lock);
1100 if (!list_empty(&work_list)) {
1101 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1102 "disconnecting them\n", obd->obd_minor, obd);
1103 class_disconnect_export_list(&work_list,
1104 get_exp_flags_from_obd(obd));
1106 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1107 obd->obd_minor, obd);
1110 EXPORT_SYMBOL(class_disconnect_exports);
1112 /* Remove exports that have not completed recovery.
1114 int class_disconnect_stale_exports(struct obd_device *obd,
1115 int (*test_export)(struct obd_export *))
1117 struct list_head work_list;
1118 struct list_head *pos, *n;
1119 struct obd_export *exp;
1123 CFS_INIT_LIST_HEAD(&work_list);
1124 spin_lock(&obd->obd_dev_lock);
1125 list_for_each_safe(pos, n, &obd->obd_exports) {
1126 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1127 if (test_export(exp))
1130 list_del(&exp->exp_obd_chain);
1131 list_add(&exp->exp_obd_chain, &work_list);
1132 /* don't count self-export as client */
1133 if (obd_uuid_equals(&exp->exp_client_uuid,
1134 &exp->exp_obd->obd_uuid))
1138 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1139 obd->obd_name, exp->exp_client_uuid.uuid,
1140 exp->exp_connection == NULL ? "<unknown>" :
1141 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1143 spin_unlock(&obd->obd_dev_lock);
1145 CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1146 obd->obd_name, cnt);
1147 class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1150 EXPORT_SYMBOL(class_disconnect_stale_exports);
1152 int oig_init(struct obd_io_group **oig_out)
1154 struct obd_io_group *oig;
1157 OBD_ALLOC(oig, sizeof(*oig));
1161 spin_lock_init(&oig->oig_lock);
1163 oig->oig_pending = 0;
1164 atomic_set(&oig->oig_refcount, 1);
1165 cfs_waitq_init(&oig->oig_waitq);
1166 CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1171 EXPORT_SYMBOL(oig_init);
1173 static inline void oig_grab(struct obd_io_group *oig)
1175 atomic_inc(&oig->oig_refcount);
1178 void oig_release(struct obd_io_group *oig)
1180 if (atomic_dec_and_test(&oig->oig_refcount))
1181 OBD_FREE(oig, sizeof(*oig));
1183 EXPORT_SYMBOL(oig_release);
1185 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1188 CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1189 spin_lock(&oig->oig_lock);
1195 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1197 spin_unlock(&oig->oig_lock);
1202 EXPORT_SYMBOL(oig_add_one);
1204 void oig_complete_one(struct obd_io_group *oig,
1205 struct oig_callback_context *occ, int rc)
1207 cfs_waitq_t *wake = NULL;
1210 spin_lock(&oig->oig_lock);
1213 list_del_init(&occ->occ_oig_item);
1215 old_rc = oig->oig_rc;
1216 if (oig->oig_rc == 0 && rc != 0)
1219 if (--oig->oig_pending <= 0)
1220 wake = &oig->oig_waitq;
1222 spin_unlock(&oig->oig_lock);
1224 CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1225 "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1228 cfs_waitq_signal(wake);
1231 EXPORT_SYMBOL(oig_complete_one);
1233 static int oig_done(struct obd_io_group *oig)
1236 spin_lock(&oig->oig_lock);
1237 if (oig->oig_pending <= 0)
1239 spin_unlock(&oig->oig_lock);
1243 static void interrupted_oig(void *data)
1245 struct obd_io_group *oig = data;
1246 struct oig_callback_context *occ;
1248 spin_lock(&oig->oig_lock);
1249 /* We need to restart the processing each time we drop the lock, as
1250 * it is possible other threads called oig_complete_one() to remove
1251 * an entry elsewhere in the list while we dropped lock. We need to
1252 * drop the lock because osc_ap_completion() calls oig_complete_one()
1253 * which re-gets this lock ;-) as well as a lock ordering issue. */
1255 list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1256 if (occ->interrupted)
1258 occ->interrupted = 1;
1259 spin_unlock(&oig->oig_lock);
1260 occ->occ_interrupted(occ);
1261 spin_lock(&oig->oig_lock);
1264 spin_unlock(&oig->oig_lock);
1267 int oig_wait(struct obd_io_group *oig)
1269 struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1272 CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1275 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1276 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1277 /* we can't continue until the oig has emptied and stopped
1278 * referencing state that the caller will free upon return */
1280 lwi = (struct l_wait_info){ 0, };
1281 } while (rc == -EINTR);
1283 LASSERTF(oig->oig_pending == 0,
1284 "exiting oig_wait(oig = %p) with %d pending\n", oig,
1287 CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1290 EXPORT_SYMBOL(oig_wait);
1292 void class_fail_export(struct obd_export *exp)
1294 int rc, already_failed;
1296 spin_lock(&exp->exp_lock);
1297 already_failed = exp->exp_failed;
1298 exp->exp_failed = 1;
1299 spin_unlock(&exp->exp_lock);
1301 if (already_failed) {
1302 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1303 exp, exp->exp_client_uuid.uuid);
1307 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1308 exp, exp->exp_client_uuid.uuid);
1310 if (obd_dump_on_timeout)
1311 libcfs_debug_dumplog();
1313 /* Most callers into obd_disconnect are removing their own reference
1314 * (request, for example) in addition to the one from the hash table.
1315 * We don't have such a reference here, so make one. */
1316 class_export_get(exp);
1317 rc = obd_disconnect(exp);
1319 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1321 CDEBUG(D_HA, "disconnected export %p/%s\n",
1322 exp, exp->exp_client_uuid.uuid);
1324 EXPORT_SYMBOL(class_fail_export);
1326 char *obd_export_nid2str(struct obd_export *exp)
1328 if (exp->exp_connection != NULL)
1329 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1333 EXPORT_SYMBOL(obd_export_nid2str);
1335 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1337 struct obd_export *doomed_exp = NULL;
1338 int exports_evicted = 0;
1340 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1343 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1344 if (doomed_exp == NULL)
1347 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1348 "nid %s found, wanted nid %s, requested nid %s\n",
1349 obd_export_nid2str(doomed_exp),
1350 libcfs_nid2str(nid_key), nid);
1351 LASSERTF(doomed_exp != obd->obd_self_export,
1352 "self-export is hashed by NID?\n");
1354 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1355 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1357 class_fail_export(doomed_exp);
1358 class_export_put(doomed_exp);
1361 if (!exports_evicted)
1362 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1363 obd->obd_name, nid);
1364 return exports_evicted;
1366 EXPORT_SYMBOL(obd_export_evict_by_nid);
1368 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1370 struct obd_export *doomed_exp = NULL;
1371 struct obd_uuid doomed_uuid;
1372 int exports_evicted = 0;
1374 obd_str2uuid(&doomed_uuid, uuid);
1375 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1376 CERROR("%s: can't evict myself\n", obd->obd_name);
1377 return exports_evicted;
1380 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1382 if (doomed_exp == NULL) {
1383 CERROR("%s: can't disconnect %s: no exports found\n",
1384 obd->obd_name, uuid);
1386 CWARN("%s: evicting %s at adminstrative request\n",
1387 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1388 class_fail_export(doomed_exp);
1389 class_export_put(doomed_exp);
1393 return exports_evicted;
1395 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1398 * kill zombie imports and exports
1400 void obd_zombie_impexp_cull(void)
1402 struct obd_import *import;
1403 struct obd_export *export;
1407 spin_lock (&obd_zombie_impexp_lock);
1410 if (!list_empty(&obd_zombie_imports)) {
1411 import = list_entry(obd_zombie_imports.next,
1414 list_del(&import->imp_zombie_chain);
1418 if (!list_empty(&obd_zombie_exports)) {
1419 export = list_entry(obd_zombie_exports.next,
1422 list_del_init(&export->exp_obd_chain);
1425 spin_unlock(&obd_zombie_impexp_lock);
1428 class_import_destroy(import);
1431 class_export_destroy(export);
1433 } while (import != NULL || export != NULL);
1437 static struct completion obd_zombie_start;
1438 static struct completion obd_zombie_stop;
1439 static unsigned long obd_zombie_flags;
1440 static cfs_waitq_t obd_zombie_waitq;
1447 * check for work for kill zombie import/export thread.
1449 int obd_zombie_impexp_check(void *arg)
1453 spin_lock(&obd_zombie_impexp_lock);
1454 rc = list_empty(&obd_zombie_imports) &&
1455 list_empty(&obd_zombie_exports) &&
1456 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1458 spin_unlock(&obd_zombie_impexp_lock);
1464 * notify import/export destroy thread about new zombie.
1466 static void obd_zombie_impexp_notify(void)
1468 cfs_waitq_signal(&obd_zombie_waitq);
1474 * destroy zombie export/import thread.
1476 static int obd_zombie_impexp_thread(void *unused)
1480 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1481 complete(&obd_zombie_start);
1485 complete(&obd_zombie_start);
1487 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1488 struct l_wait_info lwi = { 0 };
1490 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1492 obd_zombie_impexp_cull();
1495 complete(&obd_zombie_stop);
1500 #else /* ! KERNEL */
1502 static atomic_t zombie_recur = ATOMIC_INIT(0);
1503 static void *obd_zombie_impexp_work_cb;
1504 static void *obd_zombie_impexp_idle_cb;
1506 int obd_zombie_impexp_kill(void *arg)
1510 if (atomic_inc_return(&zombie_recur) == 1) {
1511 obd_zombie_impexp_cull();
1514 atomic_dec(&zombie_recur);
1521 * start destroy zombie import/export thread
1523 int obd_zombie_impexp_init(void)
1527 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1528 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1529 spin_lock_init(&obd_zombie_impexp_lock);
1530 init_completion(&obd_zombie_start);
1531 init_completion(&obd_zombie_stop);
1532 cfs_waitq_init(&obd_zombie_waitq);
1535 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1539 wait_for_completion(&obd_zombie_start);
1542 obd_zombie_impexp_work_cb =
1543 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1544 &obd_zombie_impexp_kill, NULL);
1546 obd_zombie_impexp_idle_cb =
1547 liblustre_register_idle_callback("obd_zombi_impexp_check",
1548 &obd_zombie_impexp_check, NULL);
1555 * stop destroy zombie import/export thread
1557 void obd_zombie_impexp_stop(void)
1559 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1560 obd_zombie_impexp_notify();
1562 wait_for_completion(&obd_zombie_stop);
1564 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1565 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);