Whamcloud - gitweb
0dfaff37bd69cde761e2b06fab77c3135c44d634
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/obdclass/genops.c
40  *
41  * These are the only exported functions, they provide some generic
42  * infrastructure for managing object devices
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46 #ifndef __KERNEL__
47 #include <liblustre.h>
48 #endif
49 #include <obd_ost.h>
50 #include <obd_class.h>
51 #include <lprocfs_status.h>
52
53 extern cfs_list_t obd_types;
54 cfs_spinlock_t obd_types_lock;
55
56 cfs_mem_cache_t *obd_device_cachep;
57 cfs_mem_cache_t *obdo_cachep;
58 EXPORT_SYMBOL(obdo_cachep);
59 cfs_mem_cache_t *import_cachep;
60
61 cfs_list_t      obd_zombie_imports;
62 cfs_list_t      obd_zombie_exports;
63 cfs_spinlock_t  obd_zombie_impexp_lock;
64 static void obd_zombie_impexp_notify(void);
65 static void obd_zombie_export_add(struct obd_export *exp);
66 static void obd_zombie_import_add(struct obd_import *imp);
67 static void print_export_data(struct obd_export *exp,
68                               const char *status, int locks);
69
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
71
72 /*
73  * support functions: we could use inter-module communication, but this
74  * is more portable to other OS's
75  */
76 static struct obd_device *obd_device_alloc(void)
77 {
78         struct obd_device *obd;
79
80         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
81         if (obd != NULL) {
82                 obd->obd_magic = OBD_DEVICE_MAGIC;
83         }
84         return obd;
85 }
86
87 static void obd_device_free(struct obd_device *obd)
88 {
89         LASSERT(obd != NULL);
90         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
91                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
92         if (obd->obd_namespace != NULL) {
93                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
94                        obd, obd->obd_namespace, obd->obd_force);
95                 LBUG();
96         }
97         lu_ref_fini(&obd->obd_reference);
98         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 }
100
101 struct obd_type *class_search_type(const char *name)
102 {
103         cfs_list_t *tmp;
104         struct obd_type *type;
105
106         cfs_spin_lock(&obd_types_lock);
107         cfs_list_for_each(tmp, &obd_types) {
108                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
109                 if (strcmp(type->typ_name, name) == 0) {
110                         cfs_spin_unlock(&obd_types_lock);
111                         return type;
112                 }
113         }
114         cfs_spin_unlock(&obd_types_lock);
115         return NULL;
116 }
117
118 struct obd_type *class_get_type(const char *name)
119 {
120         struct obd_type *type = class_search_type(name);
121
122 #ifdef HAVE_MODULE_LOADING_SUPPORT
123         if (!type) {
124                 const char *modname = name;
125                 if (!cfs_request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                         type = class_search_type(name);
128                 } else {
129                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130                                            modname);
131                 }
132         }
133 #endif
134         if (type) {
135                 cfs_spin_lock(&type->obd_type_lock);
136                 type->typ_refcnt++;
137                 cfs_try_module_get(type->typ_dt_ops->o_owner);
138                 cfs_spin_unlock(&type->obd_type_lock);
139         }
140         return type;
141 }
142 EXPORT_SYMBOL(class_get_type);
143
144 void class_put_type(struct obd_type *type)
145 {
146         LASSERT(type);
147         cfs_spin_lock(&type->obd_type_lock);
148         type->typ_refcnt--;
149         cfs_module_put(type->typ_dt_ops->o_owner);
150         cfs_spin_unlock(&type->obd_type_lock);
151 }
152 EXPORT_SYMBOL(class_put_type);
153
154 #define CLASS_MAX_NAME 1024
155
156 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
157                         struct lprocfs_vars *vars, const char *name,
158                         struct lu_device_type *ldt)
159 {
160         struct obd_type *type;
161         int rc = 0;
162         ENTRY;
163
164         /* sanity check */
165         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
166
167         if (class_search_type(name)) {
168                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
169                 RETURN(-EEXIST);
170         }
171
172         rc = -ENOMEM;
173         OBD_ALLOC(type, sizeof(*type));
174         if (type == NULL)
175                 RETURN(rc);
176
177         OBD_ALLOC_PTR(type->typ_dt_ops);
178         OBD_ALLOC_PTR(type->typ_md_ops);
179         OBD_ALLOC(type->typ_name, strlen(name) + 1);
180
181         if (type->typ_dt_ops == NULL ||
182             type->typ_md_ops == NULL ||
183             type->typ_name == NULL)
184                 GOTO (failed, rc);
185
186         *(type->typ_dt_ops) = *dt_ops;
187         /* md_ops is optional */
188         if (md_ops)
189                 *(type->typ_md_ops) = *md_ops;
190         strcpy(type->typ_name, name);
191         cfs_spin_lock_init(&type->obd_type_lock);
192
193 #ifdef LPROCFS
194         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
195                                               vars, type);
196         if (IS_ERR(type->typ_procroot)) {
197                 rc = PTR_ERR(type->typ_procroot);
198                 type->typ_procroot = NULL;
199                 GOTO (failed, rc);
200         }
201 #endif
202         if (ldt != NULL) {
203                 type->typ_lu = ldt;
204                 rc = lu_device_type_init(ldt);
205                 if (rc != 0)
206                         GOTO (failed, rc);
207         }
208
209         cfs_spin_lock(&obd_types_lock);
210         cfs_list_add(&type->typ_chain, &obd_types);
211         cfs_spin_unlock(&obd_types_lock);
212
213         RETURN (0);
214
215  failed:
216         if (type->typ_name != NULL)
217                 OBD_FREE(type->typ_name, strlen(name) + 1);
218         if (type->typ_md_ops != NULL)
219                 OBD_FREE_PTR(type->typ_md_ops);
220         if (type->typ_dt_ops != NULL)
221                 OBD_FREE_PTR(type->typ_dt_ops);
222         OBD_FREE(type, sizeof(*type));
223         RETURN(rc);
224 }
225 EXPORT_SYMBOL(class_register_type);
226
227 int class_unregister_type(const char *name)
228 {
229         struct obd_type *type = class_search_type(name);
230         ENTRY;
231
232         if (!type) {
233                 CERROR("unknown obd type\n");
234                 RETURN(-EINVAL);
235         }
236
237         if (type->typ_refcnt) {
238                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
239                 /* This is a bad situation, let's make the best of it */
240                 /* Remove ops, but leave the name for debugging */
241                 OBD_FREE_PTR(type->typ_dt_ops);
242                 OBD_FREE_PTR(type->typ_md_ops);
243                 RETURN(-EBUSY);
244         }
245
246         if (type->typ_procroot) {
247                 lprocfs_remove(&type->typ_procroot);
248         }
249
250         if (type->typ_lu)
251                 lu_device_type_fini(type->typ_lu);
252
253         cfs_spin_lock(&obd_types_lock);
254         cfs_list_del(&type->typ_chain);
255         cfs_spin_unlock(&obd_types_lock);
256         OBD_FREE(type->typ_name, strlen(name) + 1);
257         if (type->typ_dt_ops != NULL)
258                 OBD_FREE_PTR(type->typ_dt_ops);
259         if (type->typ_md_ops != NULL)
260                 OBD_FREE_PTR(type->typ_md_ops);
261         OBD_FREE(type, sizeof(*type));
262         RETURN(0);
263 } /* class_unregister_type */
264 EXPORT_SYMBOL(class_unregister_type);
265
266 /**
267  * Create a new obd device.
268  *
269  * Find an empty slot in ::obd_devs[], create a new obd device in it.
270  *
271  * \param[in] type_name obd device type string.
272  * \param[in] name      obd device name.
273  *
274  * \retval NULL if create fails, otherwise return the obd device
275  *         pointer created.
276  */
277 struct obd_device *class_newdev(const char *type_name, const char *name)
278 {
279         struct obd_device *result = NULL;
280         struct obd_device *newdev;
281         struct obd_type *type = NULL;
282         int i;
283         int new_obd_minor = 0;
284         ENTRY;
285
286         if (strlen(name) >= MAX_OBD_NAME) {
287                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
288                 RETURN(ERR_PTR(-EINVAL));
289         }
290
291         type = class_get_type(type_name);
292         if (type == NULL){
293                 CERROR("OBD: unknown type: %s\n", type_name);
294                 RETURN(ERR_PTR(-ENODEV));
295         }
296
297         newdev = obd_device_alloc();
298         if (newdev == NULL) {
299                 class_put_type(type);
300                 RETURN(ERR_PTR(-ENOMEM));
301         }
302         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
303
304         cfs_write_lock(&obd_dev_lock);
305         for (i = 0; i < class_devno_max(); i++) {
306                 struct obd_device *obd = class_num2obd(i);
307
308                 if (obd && obd->obd_name &&
309                     (strcmp(name, obd->obd_name) == 0)) {
310                         CERROR("Device %s already exists at %d, won't add\n",
311                                name, i);
312                         if (result) {
313                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
314                                          "%p obd_magic %08x != %08x\n", result,
315                                          result->obd_magic, OBD_DEVICE_MAGIC);
316                                 LASSERTF(result->obd_minor == new_obd_minor,
317                                          "%p obd_minor %d != %d\n", result,
318                                          result->obd_minor, new_obd_minor);
319
320                                 obd_devs[result->obd_minor] = NULL;
321                                 result->obd_name[0]='\0';
322                          }
323                         result = ERR_PTR(-EEXIST);
324                         break;
325                 }
326                 if (!result && !obd) {
327                         result = newdev;
328                         result->obd_minor = i;
329                         new_obd_minor = i;
330                         result->obd_type = type;
331                         strncpy(result->obd_name, name,
332                                 sizeof(result->obd_name) - 1);
333                         obd_devs[i] = result;
334                 }
335         }
336         cfs_write_unlock(&obd_dev_lock);
337
338         if (result == NULL && i >= class_devno_max()) {
339                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
340                        class_devno_max());
341                 RETURN(ERR_PTR(-EOVERFLOW));
342         }
343
344         if (IS_ERR(result)) {
345                 obd_device_free(newdev);
346                 class_put_type(type);
347         } else {
348                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
349                        result->obd_name, result);
350         }
351         RETURN(result);
352 }
353
354 void class_release_dev(struct obd_device *obd)
355 {
356         struct obd_type *obd_type = obd->obd_type;
357
358         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
359                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
360         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
361                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
362         LASSERT(obd_type != NULL);
363
364         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
365                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
366
367         cfs_write_lock(&obd_dev_lock);
368         obd_devs[obd->obd_minor] = NULL;
369         cfs_write_unlock(&obd_dev_lock);
370         obd_device_free(obd);
371
372         class_put_type(obd_type);
373 }
374
375 int class_name2dev(const char *name)
376 {
377         int i;
378
379         if (!name)
380                 return -1;
381
382         cfs_read_lock(&obd_dev_lock);
383         for (i = 0; i < class_devno_max(); i++) {
384                 struct obd_device *obd = class_num2obd(i);
385
386                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
387                         /* Make sure we finished attaching before we give
388                            out any references */
389                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
390                         if (obd->obd_attached) {
391                                 cfs_read_unlock(&obd_dev_lock);
392                                 return i;
393                         }
394                         break;
395                 }
396         }
397         cfs_read_unlock(&obd_dev_lock);
398
399         return -1;
400 }
401 EXPORT_SYMBOL(class_name2dev);
402
403 struct obd_device *class_name2obd(const char *name)
404 {
405         int dev = class_name2dev(name);
406
407         if (dev < 0 || dev > class_devno_max())
408                 return NULL;
409         return class_num2obd(dev);
410 }
411 EXPORT_SYMBOL(class_name2obd);
412
413 int class_uuid2dev(struct obd_uuid *uuid)
414 {
415         int i;
416
417         cfs_read_lock(&obd_dev_lock);
418         for (i = 0; i < class_devno_max(); i++) {
419                 struct obd_device *obd = class_num2obd(i);
420
421                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
422                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
423                         cfs_read_unlock(&obd_dev_lock);
424                         return i;
425                 }
426         }
427         cfs_read_unlock(&obd_dev_lock);
428
429         return -1;
430 }
431 EXPORT_SYMBOL(class_uuid2dev);
432
433 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
434 {
435         int dev = class_uuid2dev(uuid);
436         if (dev < 0)
437                 return NULL;
438         return class_num2obd(dev);
439 }
440 EXPORT_SYMBOL(class_uuid2obd);
441
442 /**
443  * Get obd device from ::obd_devs[]
444  *
445  * \param num [in] array index
446  *
447  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
448  *         otherwise return the obd device there.
449  */
450 struct obd_device *class_num2obd(int num)
451 {
452         struct obd_device *obd = NULL;
453
454         if (num < class_devno_max()) {
455                 obd = obd_devs[num];
456                 if (obd == NULL)
457                         return NULL;
458
459                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
460                          "%p obd_magic %08x != %08x\n",
461                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
462                 LASSERTF(obd->obd_minor == num,
463                          "%p obd_minor %0d != %0d\n",
464                          obd, obd->obd_minor, num);
465         }
466
467         return obd;
468 }
469 EXPORT_SYMBOL(class_num2obd);
470
471 void class_obd_list(void)
472 {
473         char *status;
474         int i;
475
476         cfs_read_lock(&obd_dev_lock);
477         for (i = 0; i < class_devno_max(); i++) {
478                 struct obd_device *obd = class_num2obd(i);
479
480                 if (obd == NULL)
481                         continue;
482                 if (obd->obd_stopping)
483                         status = "ST";
484                 else if (obd->obd_set_up)
485                         status = "UP";
486                 else if (obd->obd_attached)
487                         status = "AT";
488                 else
489                         status = "--";
490                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
491                          i, status, obd->obd_type->typ_name,
492                          obd->obd_name, obd->obd_uuid.uuid,
493                          cfs_atomic_read(&obd->obd_refcount));
494         }
495         cfs_read_unlock(&obd_dev_lock);
496         return;
497 }
498
499 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
500    specified, then only the client with that uuid is returned,
501    otherwise any client connected to the tgt is returned. */
502 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
503                                           const char * typ_name,
504                                           struct obd_uuid *grp_uuid)
505 {
506         int i;
507
508         cfs_read_lock(&obd_dev_lock);
509         for (i = 0; i < class_devno_max(); i++) {
510                 struct obd_device *obd = class_num2obd(i);
511
512                 if (obd == NULL)
513                         continue;
514                 if ((strncmp(obd->obd_type->typ_name, typ_name,
515                              strlen(typ_name)) == 0)) {
516                         if (obd_uuid_equals(tgt_uuid,
517                                             &obd->u.cli.cl_target_uuid) &&
518                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
519                                                          &obd->obd_uuid) : 1)) {
520                                 cfs_read_unlock(&obd_dev_lock);
521                                 return obd;
522                         }
523                 }
524         }
525         cfs_read_unlock(&obd_dev_lock);
526
527         return NULL;
528 }
529 EXPORT_SYMBOL(class_find_client_obd);
530
531 /* Iterate the obd_device list looking devices have grp_uuid. Start
532    searching at *next, and if a device is found, the next index to look
533    at is saved in *next. If next is NULL, then the first matching device
534    will always be returned. */
535 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
536 {
537         int i;
538
539         if (next == NULL)
540                 i = 0;
541         else if (*next >= 0 && *next < class_devno_max())
542                 i = *next;
543         else
544                 return NULL;
545
546         cfs_read_lock(&obd_dev_lock);
547         for (; i < class_devno_max(); i++) {
548                 struct obd_device *obd = class_num2obd(i);
549
550                 if (obd == NULL)
551                         continue;
552                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
553                         if (next != NULL)
554                                 *next = i+1;
555                         cfs_read_unlock(&obd_dev_lock);
556                         return obd;
557                 }
558         }
559         cfs_read_unlock(&obd_dev_lock);
560
561         return NULL;
562 }
563 EXPORT_SYMBOL(class_devices_in_group);
564
565 /**
566  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
567  * adjust sptlrpc settings accordingly.
568  */
569 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
570 {
571         struct obd_device  *obd;
572         const char         *type;
573         int                 i, rc = 0, rc2;
574
575         LASSERT(namelen > 0);
576
577         cfs_read_lock(&obd_dev_lock);
578         for (i = 0; i < class_devno_max(); i++) {
579                 obd = class_num2obd(i);
580
581                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
582                         continue;
583
584                 /* only notify mdc, osc, mdt, ost */
585                 type = obd->obd_type->typ_name;
586                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
587                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
588                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
589                     strcmp(type, LUSTRE_OST_NAME) != 0)
590                         continue;
591
592                 if (strncmp(obd->obd_name, fsname, namelen))
593                         continue;
594
595                 class_incref(obd, __FUNCTION__, obd);
596                 cfs_read_unlock(&obd_dev_lock);
597                 rc2 = obd_set_info_async(obd->obd_self_export,
598                                          sizeof(KEY_SPTLRPC_CONF),
599                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
600                 rc = rc ? rc : rc2;
601                 class_decref(obd, __FUNCTION__, obd);
602                 cfs_read_lock(&obd_dev_lock);
603         }
604         cfs_read_unlock(&obd_dev_lock);
605         return rc;
606 }
607 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
608
609 void obd_cleanup_caches(void)
610 {
611         int rc;
612
613         ENTRY;
614         if (obd_device_cachep) {
615                 rc = cfs_mem_cache_destroy(obd_device_cachep);
616                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
617                 obd_device_cachep = NULL;
618         }
619         if (obdo_cachep) {
620                 rc = cfs_mem_cache_destroy(obdo_cachep);
621                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
622                 obdo_cachep = NULL;
623         }
624         if (import_cachep) {
625                 rc = cfs_mem_cache_destroy(import_cachep);
626                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
627                 import_cachep = NULL;
628         }
629         if (capa_cachep) {
630                 rc = cfs_mem_cache_destroy(capa_cachep);
631                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
632                 capa_cachep = NULL;
633         }
634         EXIT;
635 }
636
637 int obd_init_caches(void)
638 {
639         ENTRY;
640
641         LASSERT(obd_device_cachep == NULL);
642         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
643                                                  sizeof(struct obd_device),
644                                                  0, 0);
645         if (!obd_device_cachep)
646                 GOTO(out, -ENOMEM);
647
648         LASSERT(obdo_cachep == NULL);
649         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
650                                            0, 0);
651         if (!obdo_cachep)
652                 GOTO(out, -ENOMEM);
653
654         LASSERT(import_cachep == NULL);
655         import_cachep = cfs_mem_cache_create("ll_import_cache",
656                                              sizeof(struct obd_import),
657                                              0, 0);
658         if (!import_cachep)
659                 GOTO(out, -ENOMEM);
660
661         LASSERT(capa_cachep == NULL);
662         capa_cachep = cfs_mem_cache_create("capa_cache",
663                                            sizeof(struct obd_capa), 0, 0);
664         if (!capa_cachep)
665                 GOTO(out, -ENOMEM);
666
667         RETURN(0);
668  out:
669         obd_cleanup_caches();
670         RETURN(-ENOMEM);
671
672 }
673
674 /* map connection to client */
675 struct obd_export *class_conn2export(struct lustre_handle *conn)
676 {
677         struct obd_export *export;
678         ENTRY;
679
680         if (!conn) {
681                 CDEBUG(D_CACHE, "looking for null handle\n");
682                 RETURN(NULL);
683         }
684
685         if (conn->cookie == -1) {  /* this means assign a new connection */
686                 CDEBUG(D_CACHE, "want a new connection\n");
687                 RETURN(NULL);
688         }
689
690         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
691         export = class_handle2object(conn->cookie);
692         RETURN(export);
693 }
694 EXPORT_SYMBOL(class_conn2export);
695
696 struct obd_device *class_exp2obd(struct obd_export *exp)
697 {
698         if (exp)
699                 return exp->exp_obd;
700         return NULL;
701 }
702 EXPORT_SYMBOL(class_exp2obd);
703
704 struct obd_device *class_conn2obd(struct lustre_handle *conn)
705 {
706         struct obd_export *export;
707         export = class_conn2export(conn);
708         if (export) {
709                 struct obd_device *obd = export->exp_obd;
710                 class_export_put(export);
711                 return obd;
712         }
713         return NULL;
714 }
715 EXPORT_SYMBOL(class_conn2obd);
716
717 struct obd_import *class_exp2cliimp(struct obd_export *exp)
718 {
719         struct obd_device *obd = exp->exp_obd;
720         if (obd == NULL)
721                 return NULL;
722         return obd->u.cli.cl_import;
723 }
724 EXPORT_SYMBOL(class_exp2cliimp);
725
726 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
727 {
728         struct obd_device *obd = class_conn2obd(conn);
729         if (obd == NULL)
730                 return NULL;
731         return obd->u.cli.cl_import;
732 }
733 EXPORT_SYMBOL(class_conn2cliimp);
734
735 /* Export management functions */
736 static void class_export_destroy(struct obd_export *exp)
737 {
738         struct obd_device *obd = exp->exp_obd;
739         ENTRY;
740
741         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
742
743         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
744                exp->exp_client_uuid.uuid, obd->obd_name);
745
746         LASSERT(obd != NULL);
747
748         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
749         if (exp->exp_connection)
750                 ptlrpc_put_connection_superhack(exp->exp_connection);
751
752         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
753         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
754         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
755         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
756         obd_destroy_export(exp);
757         class_decref(obd, "export", exp);
758
759         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
760         EXIT;
761 }
762
763 static void export_handle_addref(void *export)
764 {
765         class_export_get(export);
766 }
767
768 struct obd_export *class_export_get(struct obd_export *exp)
769 {
770         cfs_atomic_inc(&exp->exp_refcount);
771         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
772                cfs_atomic_read(&exp->exp_refcount));
773         return exp;
774 }
775 EXPORT_SYMBOL(class_export_get);
776
777 void class_export_put(struct obd_export *exp)
778 {
779         LASSERT(exp != NULL);
780         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
781         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
782                cfs_atomic_read(&exp->exp_refcount) - 1);
783
784         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
785                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
786                 CDEBUG(D_IOCTL, "final put %p/%s\n",
787                        exp, exp->exp_client_uuid.uuid);
788
789                 /* release nid stat refererence */
790                 lprocfs_exp_cleanup(exp);
791
792                 obd_zombie_export_add(exp);
793         }
794 }
795 EXPORT_SYMBOL(class_export_put);
796
797 /* Creates a new export, adds it to the hash table, and returns a
798  * pointer to it. The refcount is 2: one for the hash reference, and
799  * one for the pointer returned by this function. */
800 struct obd_export *class_new_export(struct obd_device *obd,
801                                     struct obd_uuid *cluuid)
802 {
803         struct obd_export *export;
804         cfs_hash_t *hash = NULL;
805         int rc = 0;
806         ENTRY;
807
808         OBD_ALLOC_PTR(export);
809         if (!export)
810                 return ERR_PTR(-ENOMEM);
811
812         export->exp_conn_cnt = 0;
813         export->exp_lock_hash = NULL;
814         cfs_atomic_set(&export->exp_refcount, 2);
815         cfs_atomic_set(&export->exp_rpc_count, 0);
816         cfs_atomic_set(&export->exp_cb_count, 0);
817         cfs_atomic_set(&export->exp_locks_count, 0);
818 #if LUSTRE_TRACKS_LOCK_EXP_REFS
819         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
820         cfs_spin_lock_init(&export->exp_locks_list_guard);
821 #endif
822         cfs_atomic_set(&export->exp_replay_count, 0);
823         export->exp_obd = obd;
824         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
825         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
826         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
827         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
828         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
829         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
830         class_handle_hash(&export->exp_handle, export_handle_addref);
831         export->exp_last_request_time = cfs_time_current_sec();
832         cfs_spin_lock_init(&export->exp_lock);
833         cfs_spin_lock_init(&export->exp_rpc_lock);
834         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
835         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
836         cfs_spin_lock_init(&export->exp_bl_list_lock);
837         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
838
839         export->exp_sp_peer = LUSTRE_SP_ANY;
840         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
841         export->exp_client_uuid = *cluuid;
842         obd_init_export(export);
843
844         cfs_spin_lock(&obd->obd_dev_lock);
845          /* shouldn't happen, but might race */
846         if (obd->obd_stopping)
847                 GOTO(exit_unlock, rc = -ENODEV);
848
849         hash = cfs_hash_getref(obd->obd_uuid_hash);
850         if (hash == NULL)
851                 GOTO(exit_unlock, rc = -ENODEV);
852         cfs_spin_unlock(&obd->obd_dev_lock);
853
854         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
855                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
856                 if (rc != 0) {
857                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
858                                       obd->obd_name, cluuid->uuid, rc);
859                         GOTO(exit_err, rc = -EALREADY);
860                 }
861         }
862
863         cfs_spin_lock(&obd->obd_dev_lock);
864         if (obd->obd_stopping) {
865                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
866                 GOTO(exit_unlock, rc = -ENODEV);
867         }
868
869         class_incref(obd, "export", export);
870         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
871         cfs_list_add_tail(&export->exp_obd_chain_timed,
872                           &export->exp_obd->obd_exports_timed);
873         export->exp_obd->obd_num_exports++;
874         cfs_spin_unlock(&obd->obd_dev_lock);
875         cfs_hash_putref(hash);
876         RETURN(export);
877
878 exit_unlock:
879         cfs_spin_unlock(&obd->obd_dev_lock);
880 exit_err:
881         if (hash)
882                 cfs_hash_putref(hash);
883         class_handle_unhash(&export->exp_handle);
884         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
885         obd_destroy_export(export);
886         OBD_FREE_PTR(export);
887         return ERR_PTR(rc);
888 }
889 EXPORT_SYMBOL(class_new_export);
890
891 void class_unlink_export(struct obd_export *exp)
892 {
893         class_handle_unhash(&exp->exp_handle);
894
895         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
896         /* delete an uuid-export hashitem from hashtables */
897         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
898                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
899                              &exp->exp_client_uuid,
900                              &exp->exp_uuid_hash);
901
902         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
903         cfs_list_del_init(&exp->exp_obd_chain_timed);
904         exp->exp_obd->obd_num_exports--;
905         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
906         class_export_put(exp);
907 }
908 EXPORT_SYMBOL(class_unlink_export);
909
910 /* Import management functions */
911 void class_import_destroy(struct obd_import *imp)
912 {
913         ENTRY;
914
915         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
916                 imp->imp_obd->obd_name);
917
918         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
919
920         ptlrpc_put_connection_superhack(imp->imp_connection);
921
922         while (!cfs_list_empty(&imp->imp_conn_list)) {
923                 struct obd_import_conn *imp_conn;
924
925                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
926                                           struct obd_import_conn, oic_item);
927                 cfs_list_del_init(&imp_conn->oic_item);
928                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
929                 OBD_FREE(imp_conn, sizeof(*imp_conn));
930         }
931
932         LASSERT(imp->imp_sec == NULL);
933         class_decref(imp->imp_obd, "import", imp);
934         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
935         EXIT;
936 }
937
938 static void import_handle_addref(void *import)
939 {
940         class_import_get(import);
941 }
942
943 struct obd_import *class_import_get(struct obd_import *import)
944 {
945         cfs_atomic_inc(&import->imp_refcount);
946         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
947                cfs_atomic_read(&import->imp_refcount),
948                import->imp_obd->obd_name);
949         return import;
950 }
951 EXPORT_SYMBOL(class_import_get);
952
953 void class_import_put(struct obd_import *imp)
954 {
955         ENTRY;
956
957         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
958         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
959
960         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
961                cfs_atomic_read(&imp->imp_refcount) - 1,
962                imp->imp_obd->obd_name);
963
964         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
965                 CDEBUG(D_INFO, "final put import %p\n", imp);
966                 obd_zombie_import_add(imp);
967         }
968
969         EXIT;
970 }
971 EXPORT_SYMBOL(class_import_put);
972
973 static void init_imp_at(struct imp_at *at) {
974         int i;
975         at_init(&at->iat_net_latency, 0, 0);
976         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
977                 /* max service estimates are tracked on the server side, so
978                    don't use the AT history here, just use the last reported
979                    val. (But keep hist for proc histogram, worst_ever) */
980                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
981                         AT_FLG_NOHIST);
982         }
983 }
984
985 struct obd_import *class_new_import(struct obd_device *obd)
986 {
987         struct obd_import *imp;
988
989         OBD_ALLOC(imp, sizeof(*imp));
990         if (imp == NULL)
991                 return NULL;
992
993         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
994         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
995         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
996         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
997         cfs_spin_lock_init(&imp->imp_lock);
998         imp->imp_last_success_conn = 0;
999         imp->imp_state = LUSTRE_IMP_NEW;
1000         imp->imp_obd = class_incref(obd, "import", imp);
1001         cfs_sema_init(&imp->imp_sec_mutex, 1);
1002         cfs_waitq_init(&imp->imp_recovery_waitq);
1003
1004         cfs_atomic_set(&imp->imp_refcount, 2);
1005         cfs_atomic_set(&imp->imp_unregistering, 0);
1006         cfs_atomic_set(&imp->imp_inflight, 0);
1007         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1008         cfs_atomic_set(&imp->imp_inval_count, 0);
1009         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1010         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1011         class_handle_hash(&imp->imp_handle, import_handle_addref);
1012         init_imp_at(&imp->imp_at);
1013
1014         /* the default magic is V2, will be used in connect RPC, and
1015          * then adjusted according to the flags in request/reply. */
1016         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1017
1018         return imp;
1019 }
1020 EXPORT_SYMBOL(class_new_import);
1021
1022 void class_destroy_import(struct obd_import *import)
1023 {
1024         LASSERT(import != NULL);
1025         LASSERT(import != LP_POISON);
1026
1027         class_handle_unhash(&import->imp_handle);
1028
1029         cfs_spin_lock(&import->imp_lock);
1030         import->imp_generation++;
1031         cfs_spin_unlock(&import->imp_lock);
1032         class_import_put(import);
1033 }
1034 EXPORT_SYMBOL(class_destroy_import);
1035
1036 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1037
1038 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1039 {
1040         cfs_spin_lock(&exp->exp_locks_list_guard);
1041
1042         LASSERT(lock->l_exp_refs_nr >= 0);
1043
1044         if (lock->l_exp_refs_target != NULL &&
1045             lock->l_exp_refs_target != exp) {
1046                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1047                               exp, lock, lock->l_exp_refs_target);
1048         }
1049         if ((lock->l_exp_refs_nr ++) == 0) {
1050                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1051                 lock->l_exp_refs_target = exp;
1052         }
1053         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1054                lock, exp, lock->l_exp_refs_nr);
1055         cfs_spin_unlock(&exp->exp_locks_list_guard);
1056 }
1057 EXPORT_SYMBOL(__class_export_add_lock_ref);
1058
1059 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1060 {
1061         cfs_spin_lock(&exp->exp_locks_list_guard);
1062         LASSERT(lock->l_exp_refs_nr > 0);
1063         if (lock->l_exp_refs_target != exp) {
1064                 LCONSOLE_WARN("lock %p, "
1065                               "mismatching export pointers: %p, %p\n",
1066                               lock, lock->l_exp_refs_target, exp);
1067         }
1068         if (-- lock->l_exp_refs_nr == 0) {
1069                 cfs_list_del_init(&lock->l_exp_refs_link);
1070                 lock->l_exp_refs_target = NULL;
1071         }
1072         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1073                lock, exp, lock->l_exp_refs_nr);
1074         cfs_spin_unlock(&exp->exp_locks_list_guard);
1075 }
1076 EXPORT_SYMBOL(__class_export_del_lock_ref);
1077 #endif
1078
1079 /* A connection defines an export context in which preallocation can
1080    be managed. This releases the export pointer reference, and returns
1081    the export handle, so the export refcount is 1 when this function
1082    returns. */
1083 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1084                   struct obd_uuid *cluuid)
1085 {
1086         struct obd_export *export;
1087         LASSERT(conn != NULL);
1088         LASSERT(obd != NULL);
1089         LASSERT(cluuid != NULL);
1090         ENTRY;
1091
1092         export = class_new_export(obd, cluuid);
1093         if (IS_ERR(export))
1094                 RETURN(PTR_ERR(export));
1095
1096         conn->cookie = export->exp_handle.h_cookie;
1097         class_export_put(export);
1098
1099         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1100                cluuid->uuid, conn->cookie);
1101         RETURN(0);
1102 }
1103 EXPORT_SYMBOL(class_connect);
1104
1105 /* if export is involved in recovery then clean up related things */
1106 void class_export_recovery_cleanup(struct obd_export *exp)
1107 {
1108         struct obd_device *obd = exp->exp_obd;
1109
1110         cfs_spin_lock(&obd->obd_recovery_task_lock);
1111         if (exp->exp_delayed)
1112                 obd->obd_delayed_clients--;
1113         if (obd->obd_recovering && exp->exp_in_recovery) {
1114                 cfs_spin_lock(&exp->exp_lock);
1115                 exp->exp_in_recovery = 0;
1116                 cfs_spin_unlock(&exp->exp_lock);
1117                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1118                 cfs_atomic_dec(&obd->obd_connected_clients);
1119         }
1120         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1121         /** Cleanup req replay fields */
1122         if (exp->exp_req_replay_needed) {
1123                 cfs_spin_lock(&exp->exp_lock);
1124                 exp->exp_req_replay_needed = 0;
1125                 cfs_spin_unlock(&exp->exp_lock);
1126                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1127                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1128         }
1129         /** Cleanup lock replay data */
1130         if (exp->exp_lock_replay_needed) {
1131                 cfs_spin_lock(&exp->exp_lock);
1132                 exp->exp_lock_replay_needed = 0;
1133                 cfs_spin_unlock(&exp->exp_lock);
1134                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1135                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1136         }
1137 }
1138
1139 /* This function removes 1-3 references from the export:
1140  * 1 - for export pointer passed
1141  * and if disconnect really need
1142  * 2 - removing from hash
1143  * 3 - in client_unlink_export
1144  * The export pointer passed to this function can destroyed */
1145 int class_disconnect(struct obd_export *export)
1146 {
1147         int already_disconnected;
1148         ENTRY;
1149
1150         if (export == NULL) {
1151                 CWARN("attempting to free NULL export %p\n", export);
1152                 RETURN(-EINVAL);
1153         }
1154
1155         cfs_spin_lock(&export->exp_lock);
1156         already_disconnected = export->exp_disconnected;
1157         export->exp_disconnected = 1;
1158         cfs_spin_unlock(&export->exp_lock);
1159
1160         /* class_cleanup(), abort_recovery(), and class_fail_export()
1161          * all end up in here, and if any of them race we shouldn't
1162          * call extra class_export_puts(). */
1163         if (already_disconnected) {
1164                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1165                 GOTO(no_disconn, already_disconnected);
1166         }
1167
1168         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1169                export->exp_handle.h_cookie);
1170
1171         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1172                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1173                              &export->exp_connection->c_peer.nid,
1174                              &export->exp_nid_hash);
1175
1176         class_export_recovery_cleanup(export);
1177         class_unlink_export(export);
1178 no_disconn:
1179         class_export_put(export);
1180         RETURN(0);
1181 }
1182 EXPORT_SYMBOL(class_disconnect);
1183
1184 /* Return non-zero for a fully connected export */
1185 int class_connected_export(struct obd_export *exp)
1186 {
1187         if (exp) {
1188                 int connected;
1189                 cfs_spin_lock(&exp->exp_lock);
1190                 connected = (exp->exp_conn_cnt > 0);
1191                 cfs_spin_unlock(&exp->exp_lock);
1192                 return connected;
1193         }
1194         return 0;
1195 }
1196 EXPORT_SYMBOL(class_connected_export);
1197
1198 static void class_disconnect_export_list(cfs_list_t *list,
1199                                          enum obd_option flags)
1200 {
1201         int rc;
1202         struct obd_export *exp;
1203         ENTRY;
1204
1205         /* It's possible that an export may disconnect itself, but
1206          * nothing else will be added to this list. */
1207         while (!cfs_list_empty(list)) {
1208                 exp = cfs_list_entry(list->next, struct obd_export,
1209                                      exp_obd_chain);
1210                 /* need for safe call CDEBUG after obd_disconnect */
1211                 class_export_get(exp);
1212
1213                 cfs_spin_lock(&exp->exp_lock);
1214                 exp->exp_flags = flags;
1215                 cfs_spin_unlock(&exp->exp_lock);
1216
1217                 if (obd_uuid_equals(&exp->exp_client_uuid,
1218                                     &exp->exp_obd->obd_uuid)) {
1219                         CDEBUG(D_HA,
1220                                "exp %p export uuid == obd uuid, don't discon\n",
1221                                exp);
1222                         /* Need to delete this now so we don't end up pointing
1223                          * to work_list later when this export is cleaned up. */
1224                         cfs_list_del_init(&exp->exp_obd_chain);
1225                         class_export_put(exp);
1226                         continue;
1227                 }
1228
1229                 class_export_get(exp);
1230                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1231                        "last request at "CFS_TIME_T"\n",
1232                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1233                        exp, exp->exp_last_request_time);
1234                 /* release one export reference anyway */
1235                 rc = obd_disconnect(exp);
1236
1237                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1238                        obd_export_nid2str(exp), exp, rc);
1239                 class_export_put(exp);
1240         }
1241         EXIT;
1242 }
1243
1244 void class_disconnect_exports(struct obd_device *obd)
1245 {
1246         cfs_list_t work_list;
1247         ENTRY;
1248
1249         /* Move all of the exports from obd_exports to a work list, en masse. */
1250         CFS_INIT_LIST_HEAD(&work_list);
1251         cfs_spin_lock(&obd->obd_dev_lock);
1252         cfs_list_splice_init(&obd->obd_exports, &work_list);
1253         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1254         cfs_spin_unlock(&obd->obd_dev_lock);
1255
1256         if (!cfs_list_empty(&work_list)) {
1257                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1258                        "disconnecting them\n", obd->obd_minor, obd);
1259                 class_disconnect_export_list(&work_list,
1260                                              exp_flags_from_obd(obd));
1261         } else
1262                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1263                        obd->obd_minor, obd);
1264         EXIT;
1265 }
1266 EXPORT_SYMBOL(class_disconnect_exports);
1267
1268 /* Remove exports that have not completed recovery.
1269  */
1270 void class_disconnect_stale_exports(struct obd_device *obd,
1271                                     int (*test_export)(struct obd_export *))
1272 {
1273         cfs_list_t work_list;
1274         cfs_list_t *pos, *n;
1275         struct obd_export *exp;
1276         int evicted = 0;
1277         ENTRY;
1278
1279         CFS_INIT_LIST_HEAD(&work_list);
1280         cfs_spin_lock(&obd->obd_dev_lock);
1281         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1282                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1283                 if (test_export(exp))
1284                         continue;
1285
1286                 /* don't count self-export as client */
1287                 if (obd_uuid_equals(&exp->exp_client_uuid,
1288                                     &exp->exp_obd->obd_uuid))
1289                         continue;
1290
1291                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1292                 evicted++;
1293                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1294                        obd->obd_name, exp->exp_client_uuid.uuid,
1295                        exp->exp_connection == NULL ? "<unknown>" :
1296                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1297                 print_export_data(exp, "EVICTING", 0);
1298         }
1299         cfs_spin_unlock(&obd->obd_dev_lock);
1300
1301         if (evicted) {
1302                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1303                        obd->obd_name, evicted);
1304                 obd->obd_stale_clients += evicted;
1305         }
1306         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1307                                                  OBD_OPT_ABORT_RECOV);
1308         EXIT;
1309 }
1310 EXPORT_SYMBOL(class_disconnect_stale_exports);
1311
1312 void class_fail_export(struct obd_export *exp)
1313 {
1314         int rc, already_failed;
1315
1316         cfs_spin_lock(&exp->exp_lock);
1317         already_failed = exp->exp_failed;
1318         exp->exp_failed = 1;
1319         cfs_spin_unlock(&exp->exp_lock);
1320
1321         if (already_failed) {
1322                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1323                        exp, exp->exp_client_uuid.uuid);
1324                 return;
1325         }
1326
1327         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1328                exp, exp->exp_client_uuid.uuid);
1329
1330         if (obd_dump_on_timeout)
1331                 libcfs_debug_dumplog();
1332
1333         /* Most callers into obd_disconnect are removing their own reference
1334          * (request, for example) in addition to the one from the hash table.
1335          * We don't have such a reference here, so make one. */
1336         class_export_get(exp);
1337         rc = obd_disconnect(exp);
1338         if (rc)
1339                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1340         else
1341                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1342                        exp, exp->exp_client_uuid.uuid);
1343 }
1344 EXPORT_SYMBOL(class_fail_export);
1345
1346 char *obd_export_nid2str(struct obd_export *exp)
1347 {
1348         if (exp->exp_connection != NULL)
1349                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1350
1351         return "(no nid)";
1352 }
1353 EXPORT_SYMBOL(obd_export_nid2str);
1354
1355 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1356 {
1357         struct obd_export *doomed_exp = NULL;
1358         int exports_evicted = 0;
1359
1360         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1361
1362         do {
1363                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1364                 if (doomed_exp == NULL)
1365                         break;
1366
1367                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1368                          "nid %s found, wanted nid %s, requested nid %s\n",
1369                          obd_export_nid2str(doomed_exp),
1370                          libcfs_nid2str(nid_key), nid);
1371                 LASSERTF(doomed_exp != obd->obd_self_export,
1372                          "self-export is hashed by NID?\n");
1373                 exports_evicted++;
1374                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1375                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1376                        exports_evicted);
1377                 class_fail_export(doomed_exp);
1378                 class_export_put(doomed_exp);
1379         } while (1);
1380
1381         if (!exports_evicted)
1382                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1383                        obd->obd_name, nid);
1384         return exports_evicted;
1385 }
1386 EXPORT_SYMBOL(obd_export_evict_by_nid);
1387
1388 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1389 {
1390         struct obd_export *doomed_exp = NULL;
1391         struct obd_uuid doomed_uuid;
1392         int exports_evicted = 0;
1393
1394         obd_str2uuid(&doomed_uuid, uuid);
1395         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1396                 CERROR("%s: can't evict myself\n", obd->obd_name);
1397                 return exports_evicted;
1398         }
1399
1400         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1401
1402         if (doomed_exp == NULL) {
1403                 CERROR("%s: can't disconnect %s: no exports found\n",
1404                        obd->obd_name, uuid);
1405         } else {
1406                 CWARN("%s: evicting %s at adminstrative request\n",
1407                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1408                 class_fail_export(doomed_exp);
1409                 class_export_put(doomed_exp);
1410                 exports_evicted++;
1411         }
1412
1413         return exports_evicted;
1414 }
1415 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1416
1417 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1418 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1419 EXPORT_SYMBOL(class_export_dump_hook);
1420 #endif
1421
1422 static void print_export_data(struct obd_export *exp, const char *status,
1423                               int locks)
1424 {
1425         struct ptlrpc_reply_state *rs;
1426         struct ptlrpc_reply_state *first_reply = NULL;
1427         int nreplies = 0;
1428
1429         cfs_spin_lock(&exp->exp_lock);
1430         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1431                                 rs_exp_list) {
1432                 if (nreplies == 0)
1433                         first_reply = rs;
1434                 nreplies++;
1435         }
1436         cfs_spin_unlock(&exp->exp_lock);
1437
1438         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1439                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1440                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1441                cfs_atomic_read(&exp->exp_rpc_count),
1442                cfs_atomic_read(&exp->exp_cb_count),
1443                cfs_atomic_read(&exp->exp_locks_count),
1444                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1445                nreplies, first_reply, nreplies > 3 ? "..." : "",
1446                exp->exp_last_committed);
1447 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1448         if (locks && class_export_dump_hook != NULL)
1449                 class_export_dump_hook(exp);
1450 #endif
1451 }
1452
1453 void dump_exports(struct obd_device *obd, int locks)
1454 {
1455         struct obd_export *exp;
1456
1457         cfs_spin_lock(&obd->obd_dev_lock);
1458         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1459                 print_export_data(exp, "ACTIVE", locks);
1460         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1461                 print_export_data(exp, "UNLINKED", locks);
1462         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1463                 print_export_data(exp, "DELAYED", locks);
1464         cfs_spin_unlock(&obd->obd_dev_lock);
1465         cfs_spin_lock(&obd_zombie_impexp_lock);
1466         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1467                 print_export_data(exp, "ZOMBIE", locks);
1468         cfs_spin_unlock(&obd_zombie_impexp_lock);
1469 }
1470 EXPORT_SYMBOL(dump_exports);
1471
1472 void obd_exports_barrier(struct obd_device *obd)
1473 {
1474         int waited = 2;
1475         LASSERT(cfs_list_empty(&obd->obd_exports));
1476         cfs_spin_lock(&obd->obd_dev_lock);
1477         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1478                 cfs_spin_unlock(&obd->obd_dev_lock);
1479                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1480                                                    cfs_time_seconds(waited));
1481                 if (waited > 5 && IS_PO2(waited)) {
1482                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1483                                       "more than %d seconds. "
1484                                       "The obd refcount = %d. Is it stuck?\n",
1485                                       obd->obd_name, waited,
1486                                       cfs_atomic_read(&obd->obd_refcount));
1487                         dump_exports(obd, 1);
1488                 }
1489                 waited *= 2;
1490                 cfs_spin_lock(&obd->obd_dev_lock);
1491         }
1492         cfs_spin_unlock(&obd->obd_dev_lock);
1493 }
1494 EXPORT_SYMBOL(obd_exports_barrier);
1495
1496 /* Total amount of zombies to be destroyed */
1497 static int zombies_count = 0;
1498
1499 /**
1500  * kill zombie imports and exports
1501  */
1502 void obd_zombie_impexp_cull(void)
1503 {
1504         struct obd_import *import;
1505         struct obd_export *export;
1506         ENTRY;
1507
1508         do {
1509                 cfs_spin_lock(&obd_zombie_impexp_lock);
1510
1511                 import = NULL;
1512                 if (!cfs_list_empty(&obd_zombie_imports)) {
1513                         import = cfs_list_entry(obd_zombie_imports.next,
1514                                                 struct obd_import,
1515                                                 imp_zombie_chain);
1516                         cfs_list_del_init(&import->imp_zombie_chain);
1517                 }
1518
1519                 export = NULL;
1520                 if (!cfs_list_empty(&obd_zombie_exports)) {
1521                         export = cfs_list_entry(obd_zombie_exports.next,
1522                                                 struct obd_export,
1523                                                 exp_obd_chain);
1524                         cfs_list_del_init(&export->exp_obd_chain);
1525                 }
1526
1527                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1528
1529                 if (import != NULL) {
1530                         class_import_destroy(import);
1531                         cfs_spin_lock(&obd_zombie_impexp_lock);
1532                         zombies_count--;
1533                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1534                 }
1535
1536                 if (export != NULL) {
1537                         class_export_destroy(export);
1538                         cfs_spin_lock(&obd_zombie_impexp_lock);
1539                         zombies_count--;
1540                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1541                 }
1542
1543                 cfs_cond_resched();
1544         } while (import != NULL || export != NULL);
1545         EXIT;
1546 }
1547
1548 static cfs_completion_t         obd_zombie_start;
1549 static cfs_completion_t         obd_zombie_stop;
1550 static unsigned long            obd_zombie_flags;
1551 static cfs_waitq_t              obd_zombie_waitq;
1552 static pid_t                    obd_zombie_pid;
1553
1554 enum {
1555         OBD_ZOMBIE_STOP   = 1 << 1
1556 };
1557
1558 /**
1559  * check for work for kill zombie import/export thread.
1560  */
1561 static int obd_zombie_impexp_check(void *arg)
1562 {
1563         int rc;
1564
1565         cfs_spin_lock(&obd_zombie_impexp_lock);
1566         rc = (zombies_count == 0) &&
1567              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1568         cfs_spin_unlock(&obd_zombie_impexp_lock);
1569
1570         RETURN(rc);
1571 }
1572
1573 /**
1574  * Add export to the obd_zombe thread and notify it.
1575  */
1576 static void obd_zombie_export_add(struct obd_export *exp) {
1577         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1578         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1579         cfs_list_del_init(&exp->exp_obd_chain);
1580         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1581         cfs_spin_lock(&obd_zombie_impexp_lock);
1582         zombies_count++;
1583         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1584         cfs_spin_unlock(&obd_zombie_impexp_lock);
1585
1586         obd_zombie_impexp_notify();
1587 }
1588
1589 /**
1590  * Add import to the obd_zombe thread and notify it.
1591  */
1592 static void obd_zombie_import_add(struct obd_import *imp) {
1593         LASSERT(imp->imp_sec == NULL);
1594         cfs_spin_lock(&obd_zombie_impexp_lock);
1595         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1596         zombies_count++;
1597         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1598         cfs_spin_unlock(&obd_zombie_impexp_lock);
1599
1600         obd_zombie_impexp_notify();
1601 }
1602
1603 /**
1604  * notify import/export destroy thread about new zombie.
1605  */
1606 static void obd_zombie_impexp_notify(void)
1607 {
1608         /*
1609          * Make sure obd_zomebie_impexp_thread get this notification.
1610          * It is possible this signal only get by obd_zombie_barrier, and
1611          * barrier gulps this notification and sleeps away and hangs ensues
1612          */
1613         cfs_waitq_broadcast(&obd_zombie_waitq);
1614 }
1615
1616 /**
1617  * check whether obd_zombie is idle
1618  */
1619 static int obd_zombie_is_idle(void)
1620 {
1621         int rc;
1622
1623         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1624         cfs_spin_lock(&obd_zombie_impexp_lock);
1625         rc = (zombies_count == 0);
1626         cfs_spin_unlock(&obd_zombie_impexp_lock);
1627         return rc;
1628 }
1629
1630 /**
1631  * wait when obd_zombie import/export queues become empty
1632  */
1633 void obd_zombie_barrier(void)
1634 {
1635         struct l_wait_info lwi = { 0 };
1636
1637         if (obd_zombie_pid == cfs_curproc_pid())
1638                 /* don't wait for myself */
1639                 return;
1640         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1641 }
1642 EXPORT_SYMBOL(obd_zombie_barrier);
1643
1644 #ifdef __KERNEL__
1645
1646 /**
1647  * destroy zombie export/import thread.
1648  */
1649 static int obd_zombie_impexp_thread(void *unused)
1650 {
1651         int rc;
1652
1653         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1654                 cfs_complete(&obd_zombie_start);
1655                 RETURN(rc);
1656         }
1657
1658         cfs_complete(&obd_zombie_start);
1659
1660         obd_zombie_pid = cfs_curproc_pid();
1661
1662         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1663                 struct l_wait_info lwi = { 0 };
1664
1665                 l_wait_event(obd_zombie_waitq,
1666                              !obd_zombie_impexp_check(NULL), &lwi);
1667                 obd_zombie_impexp_cull();
1668
1669                 /*
1670                  * Notify obd_zombie_barrier callers that queues
1671                  * may be empty.
1672                  */
1673                 cfs_waitq_signal(&obd_zombie_waitq);
1674         }
1675
1676         cfs_complete(&obd_zombie_stop);
1677
1678         RETURN(0);
1679 }
1680
1681 #else /* ! KERNEL */
1682
1683 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1684 static void *obd_zombie_impexp_work_cb;
1685 static void *obd_zombie_impexp_idle_cb;
1686
1687 int obd_zombie_impexp_kill(void *arg)
1688 {
1689         int rc = 0;
1690
1691         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1692                 obd_zombie_impexp_cull();
1693                 rc = 1;
1694         }
1695         cfs_atomic_dec(&zombie_recur);
1696         return rc;
1697 }
1698
1699 #endif
1700
1701 /**
1702  * start destroy zombie import/export thread
1703  */
1704 int obd_zombie_impexp_init(void)
1705 {
1706         int rc;
1707
1708         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1709         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1710         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1711         cfs_init_completion(&obd_zombie_start);
1712         cfs_init_completion(&obd_zombie_stop);
1713         cfs_waitq_init(&obd_zombie_waitq);
1714         obd_zombie_pid = 0;
1715
1716 #ifdef __KERNEL__
1717         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1718         if (rc < 0)
1719                 RETURN(rc);
1720
1721         cfs_wait_for_completion(&obd_zombie_start);
1722 #else
1723
1724         obd_zombie_impexp_work_cb =
1725                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1726                                                  &obd_zombie_impexp_kill, NULL);
1727
1728         obd_zombie_impexp_idle_cb =
1729                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1730                                                  &obd_zombie_impexp_check, NULL);
1731         rc = 0;
1732 #endif
1733         RETURN(rc);
1734 }
1735 /**
1736  * stop destroy zombie import/export thread
1737  */
1738 void obd_zombie_impexp_stop(void)
1739 {
1740         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1741         obd_zombie_impexp_notify();
1742 #ifdef __KERNEL__
1743         cfs_wait_for_completion(&obd_zombie_stop);
1744 #else
1745         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1746         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1747 #endif
1748 }
1749
1750 /***** Kernel-userspace comm helpers *******/
1751
1752 /* Get length of entire message, including header */
1753 int kuc_len(int payload_len)
1754 {
1755         return sizeof(struct kuc_hdr) + payload_len;
1756 }
1757 EXPORT_SYMBOL(kuc_len);
1758
1759 /* Get a pointer to kuc header, given a ptr to the payload
1760  * @param p Pointer to payload area
1761  * @returns Pointer to kuc header
1762  */
1763 struct kuc_hdr * kuc_ptr(void *p)
1764 {
1765         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1766         LASSERT(lh->kuc_magic == KUC_MAGIC);
1767         return lh;
1768 }
1769 EXPORT_SYMBOL(kuc_ptr);
1770
1771 /* Test if payload is part of kuc message
1772  * @param p Pointer to payload area
1773  * @returns boolean
1774  */
1775 int kuc_ispayload(void *p)
1776 {
1777         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1778
1779         if (kh->kuc_magic == KUC_MAGIC)
1780                 return 1;
1781         else
1782                 return 0;
1783 }
1784 EXPORT_SYMBOL(kuc_ispayload);
1785
1786 /* Alloc space for a message, and fill in header
1787  * @return Pointer to payload area
1788  */
1789 void *kuc_alloc(int payload_len, int transport, int type)
1790 {
1791         struct kuc_hdr *lh;
1792         int len = kuc_len(payload_len);
1793
1794         OBD_ALLOC(lh, len);
1795         if (lh == NULL)
1796                 return ERR_PTR(-ENOMEM);
1797
1798         lh->kuc_magic = KUC_MAGIC;
1799         lh->kuc_transport = transport;
1800         lh->kuc_msgtype = type;
1801         lh->kuc_msglen = len;
1802
1803         return (void *)(lh + 1);
1804 }
1805 EXPORT_SYMBOL(kuc_alloc);
1806
1807 /* Takes pointer to payload area */
1808 inline void kuc_free(void *p, int payload_len)
1809 {
1810         struct kuc_hdr *lh = kuc_ptr(p);
1811         OBD_FREE(lh, kuc_len(payload_len));
1812 }
1813 EXPORT_SYMBOL(kuc_free);
1814
1815
1816