1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lustre/obdclass/genops.c
38 * These are the only exported functions, they provide some generic
39 * infrastructure for managing object devices
42 #define DEBUG_SUBSYSTEM S_CLASS
44 #include <liblustre.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
59 struct list_head obd_zombie_imports;
60 struct list_head obd_zombie_exports;
61 spinlock_t obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
64 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
67 * support functions: we could use inter-module communication, but this
68 * is more portable to other OS's
70 static struct obd_device *obd_device_alloc(void)
72 struct obd_device *obd;
74 OBD_SLAB_ALLOC_PTR(obd, obd_device_cachep);
76 obd->obd_magic = OBD_DEVICE_MAGIC;
80 EXPORT_SYMBOL(obd_device_alloc);
82 static void obd_device_free(struct obd_device *obd)
85 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
86 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
87 if (obd->obd_namespace != NULL) {
88 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
89 obd, obd->obd_namespace, obd->obd_force);
92 lu_ref_fini(&obd->obd_reference);
93 OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 struct obd_type *class_search_type(const char *name)
98 struct list_head *tmp;
99 struct obd_type *type;
101 spin_lock(&obd_types_lock);
102 list_for_each(tmp, &obd_types) {
103 type = list_entry(tmp, struct obd_type, typ_chain);
104 if (strcmp(type->typ_name, name) == 0) {
105 spin_unlock(&obd_types_lock);
109 spin_unlock(&obd_types_lock);
113 struct obd_type *class_get_type(const char *name)
115 struct obd_type *type = class_search_type(name);
119 const char *modname = name;
120 if (!request_module(modname)) {
121 CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
122 type = class_search_type(name);
124 LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130 spin_lock(&type->obd_type_lock);
132 try_module_get(type->typ_dt_ops->o_owner);
133 spin_unlock(&type->obd_type_lock);
138 void class_put_type(struct obd_type *type)
141 spin_lock(&type->obd_type_lock);
143 module_put(type->typ_dt_ops->o_owner);
144 spin_unlock(&type->obd_type_lock);
147 #define CLASS_MAX_NAME 1024
149 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
150 struct lprocfs_vars *vars, const char *name,
151 struct lu_device_type *ldt)
153 struct obd_type *type;
158 LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
160 if (class_search_type(name)) {
161 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166 OBD_ALLOC(type, sizeof(*type));
170 OBD_ALLOC_PTR(type->typ_dt_ops);
171 OBD_ALLOC_PTR(type->typ_md_ops);
172 OBD_ALLOC(type->typ_name, strlen(name) + 1);
174 if (type->typ_dt_ops == NULL ||
175 type->typ_md_ops == NULL ||
176 type->typ_name == NULL)
179 *(type->typ_dt_ops) = *dt_ops;
180 /* md_ops is optional */
182 *(type->typ_md_ops) = *md_ops;
183 strcpy(type->typ_name, name);
184 spin_lock_init(&type->obd_type_lock);
187 type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
189 if (IS_ERR(type->typ_procroot)) {
190 rc = PTR_ERR(type->typ_procroot);
191 type->typ_procroot = NULL;
197 rc = lu_device_type_init(ldt);
202 spin_lock(&obd_types_lock);
203 list_add(&type->typ_chain, &obd_types);
204 spin_unlock(&obd_types_lock);
209 if (type->typ_name != NULL)
210 OBD_FREE(type->typ_name, strlen(name) + 1);
211 if (type->typ_md_ops != NULL)
212 OBD_FREE_PTR(type->typ_md_ops);
213 if (type->typ_dt_ops != NULL)
214 OBD_FREE_PTR(type->typ_dt_ops);
215 OBD_FREE(type, sizeof(*type));
219 int class_unregister_type(const char *name)
221 struct obd_type *type = class_search_type(name);
225 CERROR("unknown obd type\n");
229 if (type->typ_refcnt) {
230 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
231 /* This is a bad situation, let's make the best of it */
232 /* Remove ops, but leave the name for debugging */
233 OBD_FREE_PTR(type->typ_dt_ops);
234 OBD_FREE_PTR(type->typ_md_ops);
238 if (type->typ_procroot) {
239 lprocfs_remove(&type->typ_procroot);
243 lu_device_type_fini(type->typ_lu);
245 spin_lock(&obd_types_lock);
246 list_del(&type->typ_chain);
247 spin_unlock(&obd_types_lock);
248 OBD_FREE(type->typ_name, strlen(name) + 1);
249 if (type->typ_dt_ops != NULL)
250 OBD_FREE_PTR(type->typ_dt_ops);
251 if (type->typ_md_ops != NULL)
252 OBD_FREE_PTR(type->typ_md_ops);
253 OBD_FREE(type, sizeof(*type));
255 } /* class_unregister_type */
258 * Create a new obd device.
260 * Find an empty slot in ::obd_devs[], create a new obd device in it.
262 * \param typename [in] obd device type string.
263 * \param name [in] obd device name.
265 * \retval NULL if create fails, otherwise return the obd device
268 struct obd_device *class_newdev(const char *type_name, const char *name)
270 struct obd_device *result = NULL;
271 struct obd_device *newdev;
272 struct obd_type *type = NULL;
274 int new_obd_minor = 0;
276 if (strlen(name) >= MAX_OBD_NAME) {
277 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
278 RETURN(ERR_PTR(-EINVAL));
281 type = class_get_type(type_name);
283 CERROR("OBD: unknown type: %s\n", type_name);
284 RETURN(ERR_PTR(-ENODEV));
287 newdev = obd_device_alloc();
288 if (newdev == NULL) {
289 class_put_type(type);
290 RETURN(ERR_PTR(-ENOMEM));
292 LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
294 spin_lock(&obd_dev_lock);
295 for (i = 0; i < class_devno_max(); i++) {
296 struct obd_device *obd = class_num2obd(i);
297 if (obd && obd->obd_name &&
298 (strcmp(name, obd->obd_name) == 0)) {
299 CERROR("Device %s already exists, won't add\n", name);
301 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
302 "%p obd_magic %08x != %08x\n", result,
303 result->obd_magic, OBD_DEVICE_MAGIC);
304 LASSERTF(result->obd_minor == new_obd_minor,
305 "%p obd_minor %d != %d\n", result,
306 result->obd_minor, new_obd_minor);
308 obd_devs[result->obd_minor] = NULL;
309 result->obd_name[0]='\0';
311 result = ERR_PTR(-EEXIST);
314 if (!result && !obd) {
316 result->obd_minor = i;
318 result->obd_type = type;
319 strncpy(result->obd_name, name,
320 sizeof(result->obd_name) - 1);
321 obd_devs[i] = result;
324 spin_unlock(&obd_dev_lock);
326 if (result == NULL && i >= class_devno_max()) {
327 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
329 result = ERR_PTR(-EOVERFLOW);
332 if (IS_ERR(result)) {
333 obd_device_free(newdev);
334 class_put_type(type);
336 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
337 result->obd_name, result);
342 void class_release_dev(struct obd_device *obd)
344 struct obd_type *obd_type = obd->obd_type;
346 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
347 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
348 LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
349 obd, obd->obd_minor, obd_devs[obd->obd_minor]);
350 LASSERT(obd_type != NULL);
352 CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
353 obd->obd_name,obd->obd_type->typ_name);
355 spin_lock(&obd_dev_lock);
356 obd_devs[obd->obd_minor] = NULL;
357 spin_unlock(&obd_dev_lock);
358 obd_device_free(obd);
360 class_put_type(obd_type);
363 int class_name2dev(const char *name)
370 spin_lock(&obd_dev_lock);
371 for (i = 0; i < class_devno_max(); i++) {
372 struct obd_device *obd = class_num2obd(i);
373 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
374 /* Make sure we finished attaching before we give
375 out any references */
376 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
377 if (obd->obd_attached) {
378 spin_unlock(&obd_dev_lock);
384 spin_unlock(&obd_dev_lock);
389 struct obd_device *class_name2obd(const char *name)
391 int dev = class_name2dev(name);
393 if (dev < 0 || dev > class_devno_max())
395 return class_num2obd(dev);
398 int class_uuid2dev(struct obd_uuid *uuid)
402 spin_lock(&obd_dev_lock);
403 for (i = 0; i < class_devno_max(); i++) {
404 struct obd_device *obd = class_num2obd(i);
405 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
406 LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
407 spin_unlock(&obd_dev_lock);
411 spin_unlock(&obd_dev_lock);
416 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
418 int dev = class_uuid2dev(uuid);
421 return class_num2obd(dev);
425 * Get obd device from ::obd_devs[]
427 * \param num [in] array index
429 * \retval NULL if ::obd_devs[\a num] does not contains an obd device
430 * otherwise return the obd device there.
432 struct obd_device *class_num2obd(int num)
434 struct obd_device *obd = NULL;
436 if (num < class_devno_max()) {
441 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
442 "%p obd_magic %08x != %08x\n",
443 obd, obd->obd_magic, OBD_DEVICE_MAGIC);
444 LASSERTF(obd->obd_minor == num,
445 "%p obd_minor %0d != %0d\n",
446 obd, obd->obd_minor, num);
452 void class_obd_list(void)
457 spin_lock(&obd_dev_lock);
458 for (i = 0; i < class_devno_max(); i++) {
459 struct obd_device *obd = class_num2obd(i);
462 if (obd->obd_stopping)
464 else if (obd->obd_set_up)
466 else if (obd->obd_attached)
470 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
471 i, status, obd->obd_type->typ_name,
472 obd->obd_name, obd->obd_uuid.uuid,
473 atomic_read(&obd->obd_refcount));
475 spin_unlock(&obd_dev_lock);
479 /* Search for a client OBD connected to tgt_uuid. If grp_uuid is
480 specified, then only the client with that uuid is returned,
481 otherwise any client connected to the tgt is returned. */
482 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
483 const char * typ_name,
484 struct obd_uuid *grp_uuid)
488 spin_lock(&obd_dev_lock);
489 for (i = 0; i < class_devno_max(); i++) {
490 struct obd_device *obd = class_num2obd(i);
493 if ((strncmp(obd->obd_type->typ_name, typ_name,
494 strlen(typ_name)) == 0)) {
495 if (obd_uuid_equals(tgt_uuid,
496 &obd->u.cli.cl_target_uuid) &&
497 ((grp_uuid)? obd_uuid_equals(grp_uuid,
498 &obd->obd_uuid) : 1)) {
499 spin_unlock(&obd_dev_lock);
504 spin_unlock(&obd_dev_lock);
509 struct obd_device *class_find_client_notype(struct obd_uuid *tgt_uuid,
510 struct obd_uuid *grp_uuid)
512 struct obd_device *obd;
514 obd = class_find_client_obd(tgt_uuid, LUSTRE_MDC_NAME, NULL);
516 obd = class_find_client_obd(tgt_uuid, LUSTRE_OSC_NAME,
521 /* Iterate the obd_device list looking devices have grp_uuid. Start
522 searching at *next, and if a device is found, the next index to look
523 at is saved in *next. If next is NULL, then the first matching device
524 will always be returned. */
525 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
531 else if (*next >= 0 && *next < class_devno_max())
536 spin_lock(&obd_dev_lock);
537 for (; i < class_devno_max(); i++) {
538 struct obd_device *obd = class_num2obd(i);
541 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
544 spin_unlock(&obd_dev_lock);
548 spin_unlock(&obd_dev_lock);
554 void obd_cleanup_caches(void)
559 if (obd_device_cachep) {
560 rc = cfs_mem_cache_destroy(obd_device_cachep);
561 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
562 obd_device_cachep = NULL;
565 rc = cfs_mem_cache_destroy(obdo_cachep);
566 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
570 rc = cfs_mem_cache_destroy(import_cachep);
571 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
572 import_cachep = NULL;
575 rc = cfs_mem_cache_destroy(capa_cachep);
576 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
582 int obd_init_caches(void)
586 LASSERT(obd_device_cachep == NULL);
587 obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
588 sizeof(struct obd_device),
590 if (!obd_device_cachep)
593 LASSERT(obdo_cachep == NULL);
594 obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
599 LASSERT(import_cachep == NULL);
600 import_cachep = cfs_mem_cache_create("ll_import_cache",
601 sizeof(struct obd_import),
606 LASSERT(capa_cachep == NULL);
607 capa_cachep = cfs_mem_cache_create("capa_cache",
608 sizeof(struct obd_capa), 0, 0);
614 obd_cleanup_caches();
619 /* map connection to client */
620 struct obd_export *class_conn2export(struct lustre_handle *conn)
622 struct obd_export *export;
626 CDEBUG(D_CACHE, "looking for null handle\n");
630 if (conn->cookie == -1) { /* this means assign a new connection */
631 CDEBUG(D_CACHE, "want a new connection\n");
635 CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
636 export = class_handle2object(conn->cookie);
640 struct obd_device *class_exp2obd(struct obd_export *exp)
647 struct obd_device *class_conn2obd(struct lustre_handle *conn)
649 struct obd_export *export;
650 export = class_conn2export(conn);
652 struct obd_device *obd = export->exp_obd;
653 class_export_put(export);
659 struct obd_import *class_exp2cliimp(struct obd_export *exp)
661 struct obd_device *obd = exp->exp_obd;
664 return obd->u.cli.cl_import;
667 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
669 struct obd_device *obd = class_conn2obd(conn);
672 return obd->u.cli.cl_import;
675 /* Export management functions */
676 static void export_handle_addref(void *export)
678 class_export_get(export);
681 struct obd_export *class_export_get(struct obd_export *exp)
683 atomic_inc(&exp->exp_refcount);
684 CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
685 atomic_read(&exp->exp_refcount));
688 EXPORT_SYMBOL(class_export_get);
690 void class_export_put(struct obd_export *exp)
692 LASSERT(exp != NULL);
693 CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
694 atomic_read(&exp->exp_refcount) - 1);
695 LASSERT(atomic_read(&exp->exp_refcount) > 0);
696 LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
698 if (atomic_dec_and_test(&exp->exp_refcount)) {
699 LASSERT (list_empty(&exp->exp_obd_chain));
701 CDEBUG(D_IOCTL, "final put %p/%s\n",
702 exp, exp->exp_client_uuid.uuid);
704 spin_lock(&obd_zombie_impexp_lock);
705 list_add(&exp->exp_obd_chain, &obd_zombie_exports);
706 spin_unlock(&obd_zombie_impexp_lock);
708 if (obd_zombie_impexp_notify != NULL)
709 obd_zombie_impexp_notify();
712 EXPORT_SYMBOL(class_export_put);
714 static void class_export_destroy(struct obd_export *exp)
716 struct obd_device *obd = exp->exp_obd;
719 LASSERT (atomic_read(&exp->exp_refcount) == 0);
721 CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
722 exp->exp_client_uuid.uuid, obd->obd_name);
724 LASSERT(obd != NULL);
726 /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
727 if (exp->exp_connection)
728 ptlrpc_put_connection_superhack(exp->exp_connection);
730 LASSERT(list_empty(&exp->exp_outstanding_replies));
731 LASSERT(list_empty(&exp->exp_req_replay_queue));
732 obd_destroy_export(exp);
733 class_decref(obd, "export", exp);
735 OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
739 /* Creates a new export, adds it to the hash table, and returns a
740 * pointer to it. The refcount is 2: one for the hash reference, and
741 * one for the pointer returned by this function. */
742 struct obd_export *class_new_export(struct obd_device *obd,
743 struct obd_uuid *cluuid)
745 struct obd_export *export;
748 OBD_ALLOC_PTR(export);
750 return ERR_PTR(-ENOMEM);
752 export->exp_conn_cnt = 0;
753 export->exp_lock_hash = NULL;
754 atomic_set(&export->exp_refcount, 2);
755 atomic_set(&export->exp_rpc_count, 0);
756 export->exp_obd = obd;
757 CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
758 CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
759 CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
760 class_handle_hash(&export->exp_handle, export_handle_addref);
761 export->exp_last_request_time = cfs_time_current_sec();
762 spin_lock_init(&export->exp_lock);
763 INIT_HLIST_NODE(&export->exp_uuid_hash);
764 INIT_HLIST_NODE(&export->exp_nid_hash);
766 export->exp_sp_peer = LUSTRE_SP_ANY;
767 export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
768 export->exp_client_uuid = *cluuid;
769 obd_init_export(export);
771 spin_lock(&obd->obd_dev_lock);
772 if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
773 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
774 &export->exp_uuid_hash);
776 LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
777 obd->obd_name, cluuid->uuid, rc);
778 spin_unlock(&obd->obd_dev_lock);
779 class_handle_unhash(&export->exp_handle);
780 OBD_FREE_PTR(export);
781 return ERR_PTR(-EALREADY);
785 LASSERT(!obd->obd_stopping); /* shouldn't happen, but might race */
786 class_incref(obd, "export", export);
787 list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
788 list_add_tail(&export->exp_obd_chain_timed,
789 &export->exp_obd->obd_exports_timed);
790 export->exp_obd->obd_num_exports++;
791 spin_unlock(&obd->obd_dev_lock);
795 EXPORT_SYMBOL(class_new_export);
797 void class_unlink_export(struct obd_export *exp)
799 class_handle_unhash(&exp->exp_handle);
801 spin_lock(&exp->exp_obd->obd_dev_lock);
802 /* delete an uuid-export hashitem from hashtables */
803 if (!hlist_unhashed(&exp->exp_uuid_hash))
804 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
805 &exp->exp_client_uuid,
806 &exp->exp_uuid_hash);
808 list_del_init(&exp->exp_obd_chain);
809 list_del_init(&exp->exp_obd_chain_timed);
810 exp->exp_obd->obd_num_exports--;
811 spin_unlock(&exp->exp_obd->obd_dev_lock);
813 class_export_put(exp);
815 EXPORT_SYMBOL(class_unlink_export);
817 /* Import management functions */
818 static void import_handle_addref(void *import)
820 class_import_get(import);
823 struct obd_import *class_import_get(struct obd_import *import)
825 LASSERT(atomic_read(&import->imp_refcount) >= 0);
826 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
827 atomic_inc(&import->imp_refcount);
828 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
829 atomic_read(&import->imp_refcount));
832 EXPORT_SYMBOL(class_import_get);
834 void class_import_put(struct obd_import *import)
838 LASSERT(atomic_read(&import->imp_refcount) > 0);
839 LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
840 LASSERT(list_empty(&import->imp_zombie_chain));
842 CDEBUG(D_INFO, "import %p refcount=%d\n", import,
843 atomic_read(&import->imp_refcount) - 1);
845 if (atomic_dec_and_test(&import->imp_refcount)) {
847 CDEBUG(D_INFO, "final put import %p\n", import);
849 spin_lock(&obd_zombie_impexp_lock);
850 list_add(&import->imp_zombie_chain, &obd_zombie_imports);
851 spin_unlock(&obd_zombie_impexp_lock);
853 if (obd_zombie_impexp_notify != NULL)
854 obd_zombie_impexp_notify();
859 EXPORT_SYMBOL(class_import_put);
861 void class_import_destroy(struct obd_import *import)
865 CDEBUG(D_IOCTL, "destroying import %p for %s\n", import,
866 import->imp_obd->obd_name);
868 LASSERT(atomic_read(&import->imp_refcount) == 0);
870 ptlrpc_put_connection_superhack(import->imp_connection);
872 while (!list_empty(&import->imp_conn_list)) {
873 struct obd_import_conn *imp_conn;
875 imp_conn = list_entry(import->imp_conn_list.next,
876 struct obd_import_conn, oic_item);
877 list_del(&imp_conn->oic_item);
878 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
879 OBD_FREE(imp_conn, sizeof(*imp_conn));
882 LASSERT(import->imp_sec == NULL);
883 class_decref(import->imp_obd, "import", import);
884 OBD_FREE_RCU(import, sizeof(*import), &import->imp_handle);
888 static void init_imp_at(struct imp_at *at) {
890 at_init(&at->iat_net_latency, 0, 0);
891 for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
892 /* max service estimates are tracked on the server side, so
893 don't use the AT history here, just use the last reported
894 val. (But keep hist for proc histogram, worst_ever) */
895 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
900 struct obd_import *class_new_import(struct obd_device *obd)
902 struct obd_import *imp;
904 OBD_ALLOC(imp, sizeof(*imp));
908 CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
909 CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
910 CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
911 CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
912 spin_lock_init(&imp->imp_lock);
913 imp->imp_last_success_conn = 0;
914 imp->imp_state = LUSTRE_IMP_NEW;
915 imp->imp_obd = class_incref(obd, "import", imp);
916 sema_init(&imp->imp_sec_mutex, 1);
917 cfs_waitq_init(&imp->imp_recovery_waitq);
919 atomic_set(&imp->imp_refcount, 2);
920 atomic_set(&imp->imp_unregistering, 0);
921 atomic_set(&imp->imp_inflight, 0);
922 atomic_set(&imp->imp_replay_inflight, 0);
923 atomic_set(&imp->imp_inval_count, 0);
924 CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
925 CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
926 class_handle_hash(&imp->imp_handle, import_handle_addref);
927 init_imp_at(&imp->imp_at);
929 /* the default magic is V2, will be used in connect RPC, and
930 * then adjusted according to the flags in request/reply. */
931 imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
935 EXPORT_SYMBOL(class_new_import);
937 void class_destroy_import(struct obd_import *import)
939 LASSERT(import != NULL);
940 LASSERT(import != LP_POISON);
942 class_handle_unhash(&import->imp_handle);
944 spin_lock(&import->imp_lock);
945 import->imp_generation++;
946 spin_unlock(&import->imp_lock);
947 class_import_put(import);
949 EXPORT_SYMBOL(class_destroy_import);
951 /* A connection defines an export context in which preallocation can
952 be managed. This releases the export pointer reference, and returns
953 the export handle, so the export refcount is 1 when this function
955 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
956 struct obd_uuid *cluuid)
958 struct obd_export *export;
959 LASSERT(conn != NULL);
960 LASSERT(obd != NULL);
961 LASSERT(cluuid != NULL);
964 export = class_new_export(obd, cluuid);
966 RETURN(PTR_ERR(export));
968 conn->cookie = export->exp_handle.h_cookie;
969 class_export_put(export);
971 CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
972 cluuid->uuid, conn->cookie);
975 EXPORT_SYMBOL(class_connect);
977 /* if export is involved in recovery then clean up related things */
978 void class_export_recovery_cleanup(struct obd_export *exp)
980 struct obd_device *obd = exp->exp_obd;
982 spin_lock_bh(&obd->obd_processing_task_lock);
983 if (obd->obd_recovering && exp->exp_in_recovery) {
984 spin_lock(&exp->exp_lock);
985 exp->exp_in_recovery = 0;
986 spin_unlock(&exp->exp_lock);
987 obd->obd_connected_clients--;
988 /* each connected client is counted as recoverable */
989 obd->obd_recoverable_clients--;
990 if (exp->exp_req_replay_needed) {
991 spin_lock(&exp->exp_lock);
992 exp->exp_req_replay_needed = 0;
993 spin_unlock(&exp->exp_lock);
994 LASSERT(atomic_read(&obd->obd_req_replay_clients));
995 atomic_dec(&obd->obd_req_replay_clients);
997 if (exp->exp_lock_replay_needed) {
998 spin_lock(&exp->exp_lock);
999 exp->exp_lock_replay_needed = 0;
1000 spin_unlock(&exp->exp_lock);
1001 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1002 atomic_dec(&obd->obd_lock_replay_clients);
1005 spin_unlock_bh(&obd->obd_processing_task_lock);
1008 /* This function removes two references from the export: one for the
1009 * hash entry and one for the export pointer passed in. The export
1010 * pointer passed to this function is destroyed should not be used
1012 int class_disconnect(struct obd_export *export)
1014 int already_disconnected;
1017 if (export == NULL) {
1019 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1023 spin_lock(&export->exp_lock);
1024 already_disconnected = export->exp_disconnected;
1025 export->exp_disconnected = 1;
1027 if (!hlist_unhashed(&export->exp_nid_hash))
1028 lustre_hash_del(export->exp_obd->obd_nid_hash,
1029 &export->exp_connection->c_peer.nid,
1030 &export->exp_nid_hash);
1032 spin_unlock(&export->exp_lock);
1034 /* class_cleanup(), abort_recovery(), and class_fail_export()
1035 * all end up in here, and if any of them race we shouldn't
1036 * call extra class_export_puts(). */
1037 if (already_disconnected)
1040 CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1041 export->exp_handle.h_cookie);
1043 class_export_recovery_cleanup(export);
1044 class_unlink_export(export);
1045 class_export_put(export);
1049 static void class_disconnect_export_list(struct list_head *list, int flags)
1052 struct lustre_handle fake_conn;
1053 struct obd_export *fake_exp, *exp;
1056 /* It's possible that an export may disconnect itself, but
1057 * nothing else will be added to this list. */
1058 while (!list_empty(list)) {
1059 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1060 class_export_get(exp);
1062 spin_lock(&exp->exp_lock);
1063 exp->exp_flags = flags;
1064 spin_unlock(&exp->exp_lock);
1066 if (obd_uuid_equals(&exp->exp_client_uuid,
1067 &exp->exp_obd->obd_uuid)) {
1069 "exp %p export uuid == obd uuid, don't discon\n",
1071 /* Need to delete this now so we don't end up pointing
1072 * to work_list later when this export is cleaned up. */
1073 list_del_init(&exp->exp_obd_chain);
1074 class_export_put(exp);
1078 fake_conn.cookie = exp->exp_handle.h_cookie;
1079 fake_exp = class_conn2export(&fake_conn);
1081 class_export_put(exp);
1085 spin_lock(&fake_exp->exp_lock);
1086 fake_exp->exp_flags = flags;
1087 spin_unlock(&fake_exp->exp_lock);
1089 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1090 "last request at "CFS_TIME_T"\n",
1091 exp->exp_obd->obd_name, obd_export_nid2str(exp),
1092 exp, exp->exp_last_request_time);
1093 rc = obd_disconnect(fake_exp);
1094 class_export_put(exp);
1099 static inline int get_exp_flags_from_obd(struct obd_device *obd)
1101 return ((obd->obd_fail ? OBD_OPT_FAILOVER : 0) |
1102 (obd->obd_force ? OBD_OPT_FORCE : 0));
1105 void class_disconnect_exports(struct obd_device *obd)
1107 struct list_head work_list;
1110 /* Move all of the exports from obd_exports to a work list, en masse. */
1111 spin_lock(&obd->obd_dev_lock);
1112 list_add(&work_list, &obd->obd_exports);
1113 list_del_init(&obd->obd_exports);
1114 spin_unlock(&obd->obd_dev_lock);
1116 if (!list_empty(&work_list)) {
1117 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1118 "disconnecting them\n", obd->obd_minor, obd);
1119 class_disconnect_export_list(&work_list,
1120 get_exp_flags_from_obd(obd));
1122 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1123 obd->obd_minor, obd);
1126 EXPORT_SYMBOL(class_disconnect_exports);
1128 /* Remove exports that have not completed recovery.
1130 int class_disconnect_stale_exports(struct obd_device *obd,
1131 int (*test_export)(struct obd_export *))
1133 struct list_head work_list;
1134 struct list_head *pos, *n;
1135 struct obd_export *exp;
1139 CFS_INIT_LIST_HEAD(&work_list);
1140 spin_lock(&obd->obd_dev_lock);
1141 list_for_each_safe(pos, n, &obd->obd_exports) {
1142 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1143 if (test_export(exp))
1146 list_del(&exp->exp_obd_chain);
1147 list_add(&exp->exp_obd_chain, &work_list);
1148 /* don't count self-export as client */
1149 if (obd_uuid_equals(&exp->exp_client_uuid,
1150 &exp->exp_obd->obd_uuid))
1154 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1155 obd->obd_name, exp->exp_client_uuid.uuid,
1156 exp->exp_connection == NULL ? "<unknown>" :
1157 libcfs_nid2str(exp->exp_connection->c_peer.nid));
1159 spin_unlock(&obd->obd_dev_lock);
1161 CDEBUG(D_ERROR, "%s: disconnecting %d stale clients\n",
1162 obd->obd_name, cnt);
1163 class_disconnect_export_list(&work_list, get_exp_flags_from_obd(obd));
1166 EXPORT_SYMBOL(class_disconnect_stale_exports);
1168 int oig_init(struct obd_io_group **oig_out)
1170 struct obd_io_group *oig;
1173 OBD_ALLOC(oig, sizeof(*oig));
1177 spin_lock_init(&oig->oig_lock);
1179 oig->oig_pending = 0;
1180 atomic_set(&oig->oig_refcount, 1);
1181 cfs_waitq_init(&oig->oig_waitq);
1182 CFS_INIT_LIST_HEAD(&oig->oig_occ_list);
1187 EXPORT_SYMBOL(oig_init);
1189 static inline void oig_grab(struct obd_io_group *oig)
1191 atomic_inc(&oig->oig_refcount);
1194 void oig_release(struct obd_io_group *oig)
1196 if (atomic_dec_and_test(&oig->oig_refcount))
1197 OBD_FREE(oig, sizeof(*oig));
1199 EXPORT_SYMBOL(oig_release);
1201 int oig_add_one(struct obd_io_group *oig, struct oig_callback_context *occ)
1204 CDEBUG(D_CACHE, "oig %p ready to roll\n", oig);
1205 spin_lock(&oig->oig_lock);
1211 list_add_tail(&occ->occ_oig_item, &oig->oig_occ_list);
1213 spin_unlock(&oig->oig_lock);
1218 EXPORT_SYMBOL(oig_add_one);
1220 void oig_complete_one(struct obd_io_group *oig,
1221 struct oig_callback_context *occ, int rc)
1223 cfs_waitq_t *wake = NULL;
1226 spin_lock(&oig->oig_lock);
1229 list_del_init(&occ->occ_oig_item);
1231 old_rc = oig->oig_rc;
1232 if (oig->oig_rc == 0 && rc != 0)
1235 if (--oig->oig_pending <= 0)
1236 wake = &oig->oig_waitq;
1238 spin_unlock(&oig->oig_lock);
1240 CDEBUG(D_CACHE, "oig %p completed, rc %d -> %d via %d, %d now "
1241 "pending (racey)\n", oig, old_rc, oig->oig_rc, rc,
1244 cfs_waitq_signal(wake);
1247 EXPORT_SYMBOL(oig_complete_one);
1249 static int oig_done(struct obd_io_group *oig)
1252 spin_lock(&oig->oig_lock);
1253 if (oig->oig_pending <= 0)
1255 spin_unlock(&oig->oig_lock);
1259 static void interrupted_oig(void *data)
1261 struct obd_io_group *oig = data;
1262 struct oig_callback_context *occ;
1264 spin_lock(&oig->oig_lock);
1265 /* We need to restart the processing each time we drop the lock, as
1266 * it is possible other threads called oig_complete_one() to remove
1267 * an entry elsewhere in the list while we dropped lock. We need to
1268 * drop the lock because osc_ap_completion() calls oig_complete_one()
1269 * which re-gets this lock ;-) as well as a lock ordering issue. */
1271 list_for_each_entry(occ, &oig->oig_occ_list, occ_oig_item) {
1272 if (occ->interrupted)
1274 occ->interrupted = 1;
1275 spin_unlock(&oig->oig_lock);
1276 occ->occ_interrupted(occ);
1277 spin_lock(&oig->oig_lock);
1280 spin_unlock(&oig->oig_lock);
1283 int oig_wait(struct obd_io_group *oig)
1285 struct l_wait_info lwi = LWI_INTR(interrupted_oig, oig);
1288 CDEBUG(D_CACHE, "waiting for oig %p\n", oig);
1291 rc = l_wait_event(oig->oig_waitq, oig_done(oig), &lwi);
1292 LASSERTF(rc == 0 || rc == -EINTR, "rc: %d\n", rc);
1293 /* we can't continue until the oig has emptied and stopped
1294 * referencing state that the caller will free upon return */
1296 lwi = (struct l_wait_info){ 0, };
1297 } while (rc == -EINTR);
1299 LASSERTF(oig->oig_pending == 0,
1300 "exiting oig_wait(oig = %p) with %d pending\n", oig,
1303 CDEBUG(D_CACHE, "done waiting on oig %p rc %d\n", oig, oig->oig_rc);
1306 EXPORT_SYMBOL(oig_wait);
1308 void class_fail_export(struct obd_export *exp)
1310 int rc, already_failed;
1312 spin_lock(&exp->exp_lock);
1313 already_failed = exp->exp_failed;
1314 exp->exp_failed = 1;
1315 spin_unlock(&exp->exp_lock);
1317 if (already_failed) {
1318 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1319 exp, exp->exp_client_uuid.uuid);
1323 CDEBUG(D_HA, "disconnecting export %p/%s\n",
1324 exp, exp->exp_client_uuid.uuid);
1326 if (obd_dump_on_timeout)
1327 libcfs_debug_dumplog();
1329 /* Most callers into obd_disconnect are removing their own reference
1330 * (request, for example) in addition to the one from the hash table.
1331 * We don't have such a reference here, so make one. */
1332 class_export_get(exp);
1333 rc = obd_disconnect(exp);
1335 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1337 CDEBUG(D_HA, "disconnected export %p/%s\n",
1338 exp, exp->exp_client_uuid.uuid);
1340 EXPORT_SYMBOL(class_fail_export);
1342 char *obd_export_nid2str(struct obd_export *exp)
1344 if (exp->exp_connection != NULL)
1345 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1349 EXPORT_SYMBOL(obd_export_nid2str);
1351 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1353 struct obd_export *doomed_exp = NULL;
1354 int exports_evicted = 0;
1356 lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1359 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1360 if (doomed_exp == NULL)
1363 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1364 "nid %s found, wanted nid %s, requested nid %s\n",
1365 obd_export_nid2str(doomed_exp),
1366 libcfs_nid2str(nid_key), nid);
1367 LASSERTF(doomed_exp != obd->obd_self_export,
1368 "self-export is hashed by NID?\n");
1370 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1371 obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1373 class_fail_export(doomed_exp);
1374 class_export_put(doomed_exp);
1377 if (!exports_evicted)
1378 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1379 obd->obd_name, nid);
1380 return exports_evicted;
1382 EXPORT_SYMBOL(obd_export_evict_by_nid);
1384 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1386 struct obd_export *doomed_exp = NULL;
1387 struct obd_uuid doomed_uuid;
1388 int exports_evicted = 0;
1390 obd_str2uuid(&doomed_uuid, uuid);
1391 if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1392 CERROR("%s: can't evict myself\n", obd->obd_name);
1393 return exports_evicted;
1396 doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1398 if (doomed_exp == NULL) {
1399 CERROR("%s: can't disconnect %s: no exports found\n",
1400 obd->obd_name, uuid);
1402 CWARN("%s: evicting %s at adminstrative request\n",
1403 obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1404 class_fail_export(doomed_exp);
1405 class_export_put(doomed_exp);
1409 return exports_evicted;
1411 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1414 * kill zombie imports and exports
1416 void obd_zombie_impexp_cull(void)
1418 struct obd_import *import;
1419 struct obd_export *export;
1423 spin_lock (&obd_zombie_impexp_lock);
1426 if (!list_empty(&obd_zombie_imports)) {
1427 import = list_entry(obd_zombie_imports.next,
1430 list_del(&import->imp_zombie_chain);
1434 if (!list_empty(&obd_zombie_exports)) {
1435 export = list_entry(obd_zombie_exports.next,
1438 list_del_init(&export->exp_obd_chain);
1441 spin_unlock(&obd_zombie_impexp_lock);
1444 class_import_destroy(import);
1447 class_export_destroy(export);
1449 } while (import != NULL || export != NULL);
1453 static struct completion obd_zombie_start;
1454 static struct completion obd_zombie_stop;
1455 static unsigned long obd_zombie_flags;
1456 static cfs_waitq_t obd_zombie_waitq;
1463 * check for work for kill zombie import/export thread.
1465 int obd_zombie_impexp_check(void *arg)
1469 spin_lock(&obd_zombie_impexp_lock);
1470 rc = list_empty(&obd_zombie_imports) &&
1471 list_empty(&obd_zombie_exports) &&
1472 !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1474 spin_unlock(&obd_zombie_impexp_lock);
1480 * notify import/export destroy thread about new zombie.
1482 static void obd_zombie_impexp_notify(void)
1484 cfs_waitq_signal(&obd_zombie_waitq);
1490 * destroy zombie export/import thread.
1492 static int obd_zombie_impexp_thread(void *unused)
1496 if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1497 complete(&obd_zombie_start);
1501 complete(&obd_zombie_start);
1503 while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1504 struct l_wait_info lwi = { 0 };
1506 l_wait_event(obd_zombie_waitq, !obd_zombie_impexp_check(NULL), &lwi);
1508 obd_zombie_impexp_cull();
1511 complete(&obd_zombie_stop);
1516 #else /* ! KERNEL */
1518 static atomic_t zombie_recur = ATOMIC_INIT(0);
1519 static void *obd_zombie_impexp_work_cb;
1520 static void *obd_zombie_impexp_idle_cb;
1522 int obd_zombie_impexp_kill(void *arg)
1526 if (atomic_inc_return(&zombie_recur) == 1) {
1527 obd_zombie_impexp_cull();
1530 atomic_dec(&zombie_recur);
1537 * start destroy zombie import/export thread
1539 int obd_zombie_impexp_init(void)
1543 CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1544 CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1545 spin_lock_init(&obd_zombie_impexp_lock);
1546 init_completion(&obd_zombie_start);
1547 init_completion(&obd_zombie_stop);
1548 cfs_waitq_init(&obd_zombie_waitq);
1551 rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1555 wait_for_completion(&obd_zombie_start);
1558 obd_zombie_impexp_work_cb =
1559 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1560 &obd_zombie_impexp_kill, NULL);
1562 obd_zombie_impexp_idle_cb =
1563 liblustre_register_idle_callback("obd_zombi_impexp_check",
1564 &obd_zombie_impexp_check, NULL);
1571 * stop destroy zombie import/export thread
1573 void obd_zombie_impexp_stop(void)
1575 set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1576 obd_zombie_impexp_notify();
1578 wait_for_completion(&obd_zombie_stop);
1580 liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1581 liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);