Whamcloud - gitweb
LU-919 obdclass: remove hard coded 0x5a5a5a
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/obdclass/genops.c
40  *
41  * These are the only exported functions, they provide some generic
42  * infrastructure for managing object devices
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46 #ifndef __KERNEL__
47 #include <liblustre.h>
48 #endif
49 #include <obd_ost.h>
50 #include <obd_class.h>
51 #include <lprocfs_status.h>
52
53 extern cfs_list_t obd_types;
54 cfs_spinlock_t obd_types_lock;
55
56 cfs_mem_cache_t *obd_device_cachep;
57 cfs_mem_cache_t *obdo_cachep;
58 EXPORT_SYMBOL(obdo_cachep);
59 cfs_mem_cache_t *import_cachep;
60
61 cfs_list_t      obd_zombie_imports;
62 cfs_list_t      obd_zombie_exports;
63 cfs_spinlock_t  obd_zombie_impexp_lock;
64 static void obd_zombie_impexp_notify(void);
65 static void obd_zombie_export_add(struct obd_export *exp);
66 static void obd_zombie_import_add(struct obd_import *imp);
67 static void print_export_data(struct obd_export *exp,
68                               const char *status, int locks);
69
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
71
72 /*
73  * support functions: we could use inter-module communication, but this
74  * is more portable to other OS's
75  */
76 static struct obd_device *obd_device_alloc(void)
77 {
78         struct obd_device *obd;
79
80         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
81         if (obd != NULL) {
82                 obd->obd_magic = OBD_DEVICE_MAGIC;
83         }
84         return obd;
85 }
86
87 static void obd_device_free(struct obd_device *obd)
88 {
89         LASSERT(obd != NULL);
90         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
91                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
92         if (obd->obd_namespace != NULL) {
93                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
94                        obd, obd->obd_namespace, obd->obd_force);
95                 LBUG();
96         }
97         lu_ref_fini(&obd->obd_reference);
98         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 }
100
101 struct obd_type *class_search_type(const char *name)
102 {
103         cfs_list_t *tmp;
104         struct obd_type *type;
105
106         cfs_spin_lock(&obd_types_lock);
107         cfs_list_for_each(tmp, &obd_types) {
108                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
109                 if (strcmp(type->typ_name, name) == 0) {
110                         cfs_spin_unlock(&obd_types_lock);
111                         return type;
112                 }
113         }
114         cfs_spin_unlock(&obd_types_lock);
115         return NULL;
116 }
117
118 struct obd_type *class_get_type(const char *name)
119 {
120         struct obd_type *type = class_search_type(name);
121
122 #ifdef HAVE_MODULE_LOADING_SUPPORT
123         if (!type) {
124                 const char *modname = name;
125                 if (!cfs_request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                         type = class_search_type(name);
128                 } else {
129                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130                                            modname);
131                 }
132         }
133 #endif
134         if (type) {
135                 cfs_spin_lock(&type->obd_type_lock);
136                 type->typ_refcnt++;
137                 cfs_try_module_get(type->typ_dt_ops->o_owner);
138                 cfs_spin_unlock(&type->obd_type_lock);
139         }
140         return type;
141 }
142 EXPORT_SYMBOL(class_get_type);
143
144 void class_put_type(struct obd_type *type)
145 {
146         LASSERT(type);
147         cfs_spin_lock(&type->obd_type_lock);
148         type->typ_refcnt--;
149         cfs_module_put(type->typ_dt_ops->o_owner);
150         cfs_spin_unlock(&type->obd_type_lock);
151 }
152 EXPORT_SYMBOL(class_put_type);
153
154 #define CLASS_MAX_NAME 1024
155
156 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
157                         struct lprocfs_vars *vars, const char *name,
158                         struct lu_device_type *ldt)
159 {
160         struct obd_type *type;
161         int rc = 0;
162         ENTRY;
163
164         /* sanity check */
165         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
166
167         if (class_search_type(name)) {
168                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
169                 RETURN(-EEXIST);
170         }
171
172         rc = -ENOMEM;
173         OBD_ALLOC(type, sizeof(*type));
174         if (type == NULL)
175                 RETURN(rc);
176
177         OBD_ALLOC_PTR(type->typ_dt_ops);
178         OBD_ALLOC_PTR(type->typ_md_ops);
179         OBD_ALLOC(type->typ_name, strlen(name) + 1);
180
181         if (type->typ_dt_ops == NULL ||
182             type->typ_md_ops == NULL ||
183             type->typ_name == NULL)
184                 GOTO (failed, rc);
185
186         *(type->typ_dt_ops) = *dt_ops;
187         /* md_ops is optional */
188         if (md_ops)
189                 *(type->typ_md_ops) = *md_ops;
190         strcpy(type->typ_name, name);
191         cfs_spin_lock_init(&type->obd_type_lock);
192
193 #ifdef LPROCFS
194         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
195                                               vars, type);
196         if (IS_ERR(type->typ_procroot)) {
197                 rc = PTR_ERR(type->typ_procroot);
198                 type->typ_procroot = NULL;
199                 GOTO (failed, rc);
200         }
201 #endif
202         if (ldt != NULL) {
203                 type->typ_lu = ldt;
204                 rc = lu_device_type_init(ldt);
205                 if (rc != 0)
206                         GOTO (failed, rc);
207         }
208
209         cfs_spin_lock(&obd_types_lock);
210         cfs_list_add(&type->typ_chain, &obd_types);
211         cfs_spin_unlock(&obd_types_lock);
212
213         RETURN (0);
214
215  failed:
216         if (type->typ_name != NULL)
217                 OBD_FREE(type->typ_name, strlen(name) + 1);
218         if (type->typ_md_ops != NULL)
219                 OBD_FREE_PTR(type->typ_md_ops);
220         if (type->typ_dt_ops != NULL)
221                 OBD_FREE_PTR(type->typ_dt_ops);
222         OBD_FREE(type, sizeof(*type));
223         RETURN(rc);
224 }
225 EXPORT_SYMBOL(class_register_type);
226
227 int class_unregister_type(const char *name)
228 {
229         struct obd_type *type = class_search_type(name);
230         ENTRY;
231
232         if (!type) {
233                 CERROR("unknown obd type\n");
234                 RETURN(-EINVAL);
235         }
236
237         if (type->typ_refcnt) {
238                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
239                 /* This is a bad situation, let's make the best of it */
240                 /* Remove ops, but leave the name for debugging */
241                 OBD_FREE_PTR(type->typ_dt_ops);
242                 OBD_FREE_PTR(type->typ_md_ops);
243                 RETURN(-EBUSY);
244         }
245
246         if (type->typ_procroot) {
247                 lprocfs_remove(&type->typ_procroot);
248         }
249
250         if (type->typ_lu)
251                 lu_device_type_fini(type->typ_lu);
252
253         cfs_spin_lock(&obd_types_lock);
254         cfs_list_del(&type->typ_chain);
255         cfs_spin_unlock(&obd_types_lock);
256         OBD_FREE(type->typ_name, strlen(name) + 1);
257         if (type->typ_dt_ops != NULL)
258                 OBD_FREE_PTR(type->typ_dt_ops);
259         if (type->typ_md_ops != NULL)
260                 OBD_FREE_PTR(type->typ_md_ops);
261         OBD_FREE(type, sizeof(*type));
262         RETURN(0);
263 } /* class_unregister_type */
264 EXPORT_SYMBOL(class_unregister_type);
265
266 /**
267  * Create a new obd device.
268  *
269  * Find an empty slot in ::obd_devs[], create a new obd device in it.
270  *
271  * \param[in] type_name obd device type string.
272  * \param[in] name      obd device name.
273  *
274  * \retval NULL if create fails, otherwise return the obd device
275  *         pointer created.
276  */
277 struct obd_device *class_newdev(const char *type_name, const char *name)
278 {
279         struct obd_device *result = NULL;
280         struct obd_device *newdev;
281         struct obd_type *type = NULL;
282         int i;
283         int new_obd_minor = 0;
284         ENTRY;
285
286         if (strlen(name) >= MAX_OBD_NAME) {
287                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
288                 RETURN(ERR_PTR(-EINVAL));
289         }
290
291         type = class_get_type(type_name);
292         if (type == NULL){
293                 CERROR("OBD: unknown type: %s\n", type_name);
294                 RETURN(ERR_PTR(-ENODEV));
295         }
296
297         newdev = obd_device_alloc();
298         if (newdev == NULL) {
299                 class_put_type(type);
300                 RETURN(ERR_PTR(-ENOMEM));
301         }
302         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
303
304         cfs_write_lock(&obd_dev_lock);
305         for (i = 0; i < class_devno_max(); i++) {
306                 struct obd_device *obd = class_num2obd(i);
307
308                 if (obd && obd->obd_name &&
309                     (strcmp(name, obd->obd_name) == 0)) {
310                         CERROR("Device %s already exists at %d, won't add\n",
311                                name, i);
312                         if (result) {
313                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
314                                          "%p obd_magic %08x != %08x\n", result,
315                                          result->obd_magic, OBD_DEVICE_MAGIC);
316                                 LASSERTF(result->obd_minor == new_obd_minor,
317                                          "%p obd_minor %d != %d\n", result,
318                                          result->obd_minor, new_obd_minor);
319
320                                 obd_devs[result->obd_minor] = NULL;
321                                 result->obd_name[0]='\0';
322                          }
323                         result = ERR_PTR(-EEXIST);
324                         break;
325                 }
326                 if (!result && !obd) {
327                         result = newdev;
328                         result->obd_minor = i;
329                         new_obd_minor = i;
330                         result->obd_type = type;
331                         strncpy(result->obd_name, name,
332                                 sizeof(result->obd_name) - 1);
333                         obd_devs[i] = result;
334                 }
335         }
336         cfs_write_unlock(&obd_dev_lock);
337
338         if (result == NULL && i >= class_devno_max()) {
339                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
340                        class_devno_max());
341                 RETURN(ERR_PTR(-EOVERFLOW));
342         }
343
344         if (IS_ERR(result)) {
345                 obd_device_free(newdev);
346                 class_put_type(type);
347         } else {
348                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
349                        result->obd_name, result);
350         }
351         RETURN(result);
352 }
353
354 void class_release_dev(struct obd_device *obd)
355 {
356         struct obd_type *obd_type = obd->obd_type;
357
358         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
359                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
360         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
361                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
362         LASSERT(obd_type != NULL);
363
364         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
365                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
366
367         cfs_write_lock(&obd_dev_lock);
368         obd_devs[obd->obd_minor] = NULL;
369         cfs_write_unlock(&obd_dev_lock);
370         obd_device_free(obd);
371
372         class_put_type(obd_type);
373 }
374
375 int class_name2dev(const char *name)
376 {
377         int i;
378
379         if (!name)
380                 return -1;
381
382         cfs_read_lock(&obd_dev_lock);
383         for (i = 0; i < class_devno_max(); i++) {
384                 struct obd_device *obd = class_num2obd(i);
385
386                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
387                         /* Make sure we finished attaching before we give
388                            out any references */
389                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
390                         if (obd->obd_attached) {
391                                 cfs_read_unlock(&obd_dev_lock);
392                                 return i;
393                         }
394                         break;
395                 }
396         }
397         cfs_read_unlock(&obd_dev_lock);
398
399         return -1;
400 }
401 EXPORT_SYMBOL(class_name2dev);
402
403 struct obd_device *class_name2obd(const char *name)
404 {
405         int dev = class_name2dev(name);
406
407         if (dev < 0 || dev > class_devno_max())
408                 return NULL;
409         return class_num2obd(dev);
410 }
411 EXPORT_SYMBOL(class_name2obd);
412
413 int class_uuid2dev(struct obd_uuid *uuid)
414 {
415         int i;
416
417         cfs_read_lock(&obd_dev_lock);
418         for (i = 0; i < class_devno_max(); i++) {
419                 struct obd_device *obd = class_num2obd(i);
420
421                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
422                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
423                         cfs_read_unlock(&obd_dev_lock);
424                         return i;
425                 }
426         }
427         cfs_read_unlock(&obd_dev_lock);
428
429         return -1;
430 }
431 EXPORT_SYMBOL(class_uuid2dev);
432
433 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
434 {
435         int dev = class_uuid2dev(uuid);
436         if (dev < 0)
437                 return NULL;
438         return class_num2obd(dev);
439 }
440 EXPORT_SYMBOL(class_uuid2obd);
441
442 /**
443  * Get obd device from ::obd_devs[]
444  *
445  * \param num [in] array index
446  *
447  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
448  *         otherwise return the obd device there.
449  */
450 struct obd_device *class_num2obd(int num)
451 {
452         struct obd_device *obd = NULL;
453
454         if (num < class_devno_max()) {
455                 obd = obd_devs[num];
456                 if (obd == NULL)
457                         return NULL;
458
459                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
460                          "%p obd_magic %08x != %08x\n",
461                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
462                 LASSERTF(obd->obd_minor == num,
463                          "%p obd_minor %0d != %0d\n",
464                          obd, obd->obd_minor, num);
465         }
466
467         return obd;
468 }
469 EXPORT_SYMBOL(class_num2obd);
470
471 void class_obd_list(void)
472 {
473         char *status;
474         int i;
475
476         cfs_read_lock(&obd_dev_lock);
477         for (i = 0; i < class_devno_max(); i++) {
478                 struct obd_device *obd = class_num2obd(i);
479
480                 if (obd == NULL)
481                         continue;
482                 if (obd->obd_stopping)
483                         status = "ST";
484                 else if (obd->obd_set_up)
485                         status = "UP";
486                 else if (obd->obd_attached)
487                         status = "AT";
488                 else
489                         status = "--";
490                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
491                          i, status, obd->obd_type->typ_name,
492                          obd->obd_name, obd->obd_uuid.uuid,
493                          cfs_atomic_read(&obd->obd_refcount));
494         }
495         cfs_read_unlock(&obd_dev_lock);
496         return;
497 }
498
499 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
500    specified, then only the client with that uuid is returned,
501    otherwise any client connected to the tgt is returned. */
502 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
503                                           const char * typ_name,
504                                           struct obd_uuid *grp_uuid)
505 {
506         int i;
507
508         cfs_read_lock(&obd_dev_lock);
509         for (i = 0; i < class_devno_max(); i++) {
510                 struct obd_device *obd = class_num2obd(i);
511
512                 if (obd == NULL)
513                         continue;
514                 if ((strncmp(obd->obd_type->typ_name, typ_name,
515                              strlen(typ_name)) == 0)) {
516                         if (obd_uuid_equals(tgt_uuid,
517                                             &obd->u.cli.cl_target_uuid) &&
518                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
519                                                          &obd->obd_uuid) : 1)) {
520                                 cfs_read_unlock(&obd_dev_lock);
521                                 return obd;
522                         }
523                 }
524         }
525         cfs_read_unlock(&obd_dev_lock);
526
527         return NULL;
528 }
529 EXPORT_SYMBOL(class_find_client_obd);
530
531 /* Iterate the obd_device list looking devices have grp_uuid. Start
532    searching at *next, and if a device is found, the next index to look
533    at is saved in *next. If next is NULL, then the first matching device
534    will always be returned. */
535 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
536 {
537         int i;
538
539         if (next == NULL)
540                 i = 0;
541         else if (*next >= 0 && *next < class_devno_max())
542                 i = *next;
543         else
544                 return NULL;
545
546         cfs_read_lock(&obd_dev_lock);
547         for (; i < class_devno_max(); i++) {
548                 struct obd_device *obd = class_num2obd(i);
549
550                 if (obd == NULL)
551                         continue;
552                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
553                         if (next != NULL)
554                                 *next = i+1;
555                         cfs_read_unlock(&obd_dev_lock);
556                         return obd;
557                 }
558         }
559         cfs_read_unlock(&obd_dev_lock);
560
561         return NULL;
562 }
563 EXPORT_SYMBOL(class_devices_in_group);
564
565 /**
566  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
567  * adjust sptlrpc settings accordingly.
568  */
569 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
570 {
571         struct obd_device  *obd;
572         const char         *type;
573         int                 i, rc = 0, rc2;
574
575         LASSERT(namelen > 0);
576
577         cfs_read_lock(&obd_dev_lock);
578         for (i = 0; i < class_devno_max(); i++) {
579                 obd = class_num2obd(i);
580
581                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
582                         continue;
583
584                 /* only notify mdc, osc, mdt, ost */
585                 type = obd->obd_type->typ_name;
586                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
587                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
588                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
589                     strcmp(type, LUSTRE_OST_NAME) != 0)
590                         continue;
591
592                 if (strncmp(obd->obd_name, fsname, namelen))
593                         continue;
594
595                 class_incref(obd, __FUNCTION__, obd);
596                 cfs_read_unlock(&obd_dev_lock);
597                 rc2 = obd_set_info_async(obd->obd_self_export,
598                                          sizeof(KEY_SPTLRPC_CONF),
599                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
600                 rc = rc ? rc : rc2;
601                 class_decref(obd, __FUNCTION__, obd);
602                 cfs_read_lock(&obd_dev_lock);
603         }
604         cfs_read_unlock(&obd_dev_lock);
605         return rc;
606 }
607 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
608
609 void obd_cleanup_caches(void)
610 {
611         int rc;
612
613         ENTRY;
614         if (obd_device_cachep) {
615                 rc = cfs_mem_cache_destroy(obd_device_cachep);
616                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
617                 obd_device_cachep = NULL;
618         }
619         if (obdo_cachep) {
620                 rc = cfs_mem_cache_destroy(obdo_cachep);
621                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
622                 obdo_cachep = NULL;
623         }
624         if (import_cachep) {
625                 rc = cfs_mem_cache_destroy(import_cachep);
626                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
627                 import_cachep = NULL;
628         }
629         if (capa_cachep) {
630                 rc = cfs_mem_cache_destroy(capa_cachep);
631                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
632                 capa_cachep = NULL;
633         }
634         EXIT;
635 }
636
637 int obd_init_caches(void)
638 {
639         ENTRY;
640
641         LASSERT(obd_device_cachep == NULL);
642         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
643                                                  sizeof(struct obd_device),
644                                                  0, 0);
645         if (!obd_device_cachep)
646                 GOTO(out, -ENOMEM);
647
648         LASSERT(obdo_cachep == NULL);
649         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
650                                            0, 0);
651         if (!obdo_cachep)
652                 GOTO(out, -ENOMEM);
653
654         LASSERT(import_cachep == NULL);
655         import_cachep = cfs_mem_cache_create("ll_import_cache",
656                                              sizeof(struct obd_import),
657                                              0, 0);
658         if (!import_cachep)
659                 GOTO(out, -ENOMEM);
660
661         LASSERT(capa_cachep == NULL);
662         capa_cachep = cfs_mem_cache_create("capa_cache",
663                                            sizeof(struct obd_capa), 0, 0);
664         if (!capa_cachep)
665                 GOTO(out, -ENOMEM);
666
667         RETURN(0);
668  out:
669         obd_cleanup_caches();
670         RETURN(-ENOMEM);
671
672 }
673
674 /* map connection to client */
675 struct obd_export *class_conn2export(struct lustre_handle *conn)
676 {
677         struct obd_export *export;
678         ENTRY;
679
680         if (!conn) {
681                 CDEBUG(D_CACHE, "looking for null handle\n");
682                 RETURN(NULL);
683         }
684
685         if (conn->cookie == -1) {  /* this means assign a new connection */
686                 CDEBUG(D_CACHE, "want a new connection\n");
687                 RETURN(NULL);
688         }
689
690         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
691         export = class_handle2object(conn->cookie);
692         RETURN(export);
693 }
694 EXPORT_SYMBOL(class_conn2export);
695
696 struct obd_device *class_exp2obd(struct obd_export *exp)
697 {
698         if (exp)
699                 return exp->exp_obd;
700         return NULL;
701 }
702 EXPORT_SYMBOL(class_exp2obd);
703
704 struct obd_device *class_conn2obd(struct lustre_handle *conn)
705 {
706         struct obd_export *export;
707         export = class_conn2export(conn);
708         if (export) {
709                 struct obd_device *obd = export->exp_obd;
710                 class_export_put(export);
711                 return obd;
712         }
713         return NULL;
714 }
715 EXPORT_SYMBOL(class_conn2obd);
716
717 struct obd_import *class_exp2cliimp(struct obd_export *exp)
718 {
719         struct obd_device *obd = exp->exp_obd;
720         if (obd == NULL)
721                 return NULL;
722         return obd->u.cli.cl_import;
723 }
724 EXPORT_SYMBOL(class_exp2cliimp);
725
726 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
727 {
728         struct obd_device *obd = class_conn2obd(conn);
729         if (obd == NULL)
730                 return NULL;
731         return obd->u.cli.cl_import;
732 }
733 EXPORT_SYMBOL(class_conn2cliimp);
734
735 /* Export management functions */
736 static void class_export_destroy(struct obd_export *exp)
737 {
738         struct obd_device *obd = exp->exp_obd;
739         ENTRY;
740
741         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
742
743         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
744                exp->exp_client_uuid.uuid, obd->obd_name);
745
746         LASSERT(obd != NULL);
747
748         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
749         if (exp->exp_connection)
750                 ptlrpc_put_connection_superhack(exp->exp_connection);
751
752         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
753         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
754         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
755         LASSERT(cfs_list_empty(&exp->exp_queued_rpc));
756         obd_destroy_export(exp);
757         class_decref(obd, "export", exp);
758
759         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
760         EXIT;
761 }
762
763 static void export_handle_addref(void *export)
764 {
765         class_export_get(export);
766 }
767
768 struct obd_export *class_export_get(struct obd_export *exp)
769 {
770         cfs_atomic_inc(&exp->exp_refcount);
771         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
772                cfs_atomic_read(&exp->exp_refcount));
773         return exp;
774 }
775 EXPORT_SYMBOL(class_export_get);
776
777 void class_export_put(struct obd_export *exp)
778 {
779         LASSERT(exp != NULL);
780         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
781         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
782                cfs_atomic_read(&exp->exp_refcount) - 1);
783
784         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
785                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
786                 CDEBUG(D_IOCTL, "final put %p/%s\n",
787                        exp, exp->exp_client_uuid.uuid);
788
789                 /* release nid stat refererence */
790                 lprocfs_exp_cleanup(exp);
791
792                 obd_zombie_export_add(exp);
793         }
794 }
795 EXPORT_SYMBOL(class_export_put);
796
797 /* Creates a new export, adds it to the hash table, and returns a
798  * pointer to it. The refcount is 2: one for the hash reference, and
799  * one for the pointer returned by this function. */
800 struct obd_export *class_new_export(struct obd_device *obd,
801                                     struct obd_uuid *cluuid)
802 {
803         struct obd_export *export;
804         cfs_hash_t *hash = NULL;
805         int rc = 0;
806         ENTRY;
807
808         OBD_ALLOC_PTR(export);
809         if (!export)
810                 return ERR_PTR(-ENOMEM);
811
812         export->exp_conn_cnt = 0;
813         export->exp_lock_hash = NULL;
814         cfs_atomic_set(&export->exp_refcount, 2);
815         cfs_atomic_set(&export->exp_rpc_count, 0);
816         cfs_atomic_set(&export->exp_cb_count, 0);
817         cfs_atomic_set(&export->exp_locks_count, 0);
818 #if LUSTRE_TRACKS_LOCK_EXP_REFS
819         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
820         cfs_spin_lock_init(&export->exp_locks_list_guard);
821 #endif
822         cfs_atomic_set(&export->exp_replay_count, 0);
823         export->exp_obd = obd;
824         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
825         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
826         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
827         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
828         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
829         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
830         class_handle_hash(&export->exp_handle, export_handle_addref);
831         export->exp_last_request_time = cfs_time_current_sec();
832         cfs_spin_lock_init(&export->exp_lock);
833         cfs_spin_lock_init(&export->exp_rpc_lock);
834         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
835         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
836
837         export->exp_sp_peer = LUSTRE_SP_ANY;
838         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
839         export->exp_client_uuid = *cluuid;
840         obd_init_export(export);
841
842         cfs_spin_lock(&obd->obd_dev_lock);
843          /* shouldn't happen, but might race */
844         if (obd->obd_stopping)
845                 GOTO(exit_unlock, rc = -ENODEV);
846
847         hash = cfs_hash_getref(obd->obd_uuid_hash);
848         if (hash == NULL)
849                 GOTO(exit_unlock, rc = -ENODEV);
850         cfs_spin_unlock(&obd->obd_dev_lock);
851
852         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
853                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
854                 if (rc != 0) {
855                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
856                                       obd->obd_name, cluuid->uuid, rc);
857                         GOTO(exit_err, rc = -EALREADY);
858                 }
859         }
860
861         cfs_spin_lock(&obd->obd_dev_lock);
862         if (obd->obd_stopping) {
863                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
864                 GOTO(exit_unlock, rc = -ENODEV);
865         }
866
867         class_incref(obd, "export", export);
868         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
869         cfs_list_add_tail(&export->exp_obd_chain_timed,
870                           &export->exp_obd->obd_exports_timed);
871         export->exp_obd->obd_num_exports++;
872         cfs_spin_unlock(&obd->obd_dev_lock);
873         cfs_hash_putref(hash);
874         RETURN(export);
875
876 exit_unlock:
877         cfs_spin_unlock(&obd->obd_dev_lock);
878 exit_err:
879         if (hash)
880                 cfs_hash_putref(hash);
881         class_handle_unhash(&export->exp_handle);
882         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
883         obd_destroy_export(export);
884         OBD_FREE_PTR(export);
885         return ERR_PTR(rc);
886 }
887 EXPORT_SYMBOL(class_new_export);
888
889 void class_unlink_export(struct obd_export *exp)
890 {
891         class_handle_unhash(&exp->exp_handle);
892
893         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
894         /* delete an uuid-export hashitem from hashtables */
895         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
896                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
897                              &exp->exp_client_uuid,
898                              &exp->exp_uuid_hash);
899
900         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
901         cfs_list_del_init(&exp->exp_obd_chain_timed);
902         exp->exp_obd->obd_num_exports--;
903         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
904         class_export_put(exp);
905 }
906 EXPORT_SYMBOL(class_unlink_export);
907
908 /* Import management functions */
909 void class_import_destroy(struct obd_import *imp)
910 {
911         ENTRY;
912
913         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
914                 imp->imp_obd->obd_name);
915
916         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
917
918         ptlrpc_put_connection_superhack(imp->imp_connection);
919
920         while (!cfs_list_empty(&imp->imp_conn_list)) {
921                 struct obd_import_conn *imp_conn;
922
923                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
924                                           struct obd_import_conn, oic_item);
925                 cfs_list_del_init(&imp_conn->oic_item);
926                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
927                 OBD_FREE(imp_conn, sizeof(*imp_conn));
928         }
929
930         LASSERT(imp->imp_sec == NULL);
931         class_decref(imp->imp_obd, "import", imp);
932         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
933         EXIT;
934 }
935
936 static void import_handle_addref(void *import)
937 {
938         class_import_get(import);
939 }
940
941 struct obd_import *class_import_get(struct obd_import *import)
942 {
943         cfs_atomic_inc(&import->imp_refcount);
944         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
945                cfs_atomic_read(&import->imp_refcount),
946                import->imp_obd->obd_name);
947         return import;
948 }
949 EXPORT_SYMBOL(class_import_get);
950
951 void class_import_put(struct obd_import *imp)
952 {
953         ENTRY;
954
955         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
956         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
957
958         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
959                cfs_atomic_read(&imp->imp_refcount) - 1,
960                imp->imp_obd->obd_name);
961
962         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
963                 CDEBUG(D_INFO, "final put import %p\n", imp);
964                 obd_zombie_import_add(imp);
965         }
966
967         EXIT;
968 }
969 EXPORT_SYMBOL(class_import_put);
970
971 static void init_imp_at(struct imp_at *at) {
972         int i;
973         at_init(&at->iat_net_latency, 0, 0);
974         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
975                 /* max service estimates are tracked on the server side, so
976                    don't use the AT history here, just use the last reported
977                    val. (But keep hist for proc histogram, worst_ever) */
978                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
979                         AT_FLG_NOHIST);
980         }
981 }
982
983 struct obd_import *class_new_import(struct obd_device *obd)
984 {
985         struct obd_import *imp;
986
987         OBD_ALLOC(imp, sizeof(*imp));
988         if (imp == NULL)
989                 return NULL;
990
991         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
992         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
993         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
994         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
995         cfs_spin_lock_init(&imp->imp_lock);
996         imp->imp_last_success_conn = 0;
997         imp->imp_state = LUSTRE_IMP_NEW;
998         imp->imp_obd = class_incref(obd, "import", imp);
999         cfs_sema_init(&imp->imp_sec_mutex, 1);
1000         cfs_waitq_init(&imp->imp_recovery_waitq);
1001
1002         cfs_atomic_set(&imp->imp_refcount, 2);
1003         cfs_atomic_set(&imp->imp_unregistering, 0);
1004         cfs_atomic_set(&imp->imp_inflight, 0);
1005         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1006         cfs_atomic_set(&imp->imp_inval_count, 0);
1007         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1008         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1009         class_handle_hash(&imp->imp_handle, import_handle_addref);
1010         init_imp_at(&imp->imp_at);
1011
1012         /* the default magic is V2, will be used in connect RPC, and
1013          * then adjusted according to the flags in request/reply. */
1014         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1015
1016         return imp;
1017 }
1018 EXPORT_SYMBOL(class_new_import);
1019
1020 void class_destroy_import(struct obd_import *import)
1021 {
1022         LASSERT(import != NULL);
1023         LASSERT(import != LP_POISON);
1024
1025         class_handle_unhash(&import->imp_handle);
1026
1027         cfs_spin_lock(&import->imp_lock);
1028         import->imp_generation++;
1029         cfs_spin_unlock(&import->imp_lock);
1030         class_import_put(import);
1031 }
1032 EXPORT_SYMBOL(class_destroy_import);
1033
1034 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1035
1036 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1037 {
1038         cfs_spin_lock(&exp->exp_locks_list_guard);
1039
1040         LASSERT(lock->l_exp_refs_nr >= 0);
1041
1042         if (lock->l_exp_refs_target != NULL &&
1043             lock->l_exp_refs_target != exp) {
1044                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1045                               exp, lock, lock->l_exp_refs_target);
1046         }
1047         if ((lock->l_exp_refs_nr ++) == 0) {
1048                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1049                 lock->l_exp_refs_target = exp;
1050         }
1051         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1052                lock, exp, lock->l_exp_refs_nr);
1053         cfs_spin_unlock(&exp->exp_locks_list_guard);
1054 }
1055 EXPORT_SYMBOL(__class_export_add_lock_ref);
1056
1057 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1058 {
1059         cfs_spin_lock(&exp->exp_locks_list_guard);
1060         LASSERT(lock->l_exp_refs_nr > 0);
1061         if (lock->l_exp_refs_target != exp) {
1062                 LCONSOLE_WARN("lock %p, "
1063                               "mismatching export pointers: %p, %p\n",
1064                               lock, lock->l_exp_refs_target, exp);
1065         }
1066         if (-- lock->l_exp_refs_nr == 0) {
1067                 cfs_list_del_init(&lock->l_exp_refs_link);
1068                 lock->l_exp_refs_target = NULL;
1069         }
1070         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1071                lock, exp, lock->l_exp_refs_nr);
1072         cfs_spin_unlock(&exp->exp_locks_list_guard);
1073 }
1074 EXPORT_SYMBOL(__class_export_del_lock_ref);
1075 #endif
1076
1077 /* A connection defines an export context in which preallocation can
1078    be managed. This releases the export pointer reference, and returns
1079    the export handle, so the export refcount is 1 when this function
1080    returns. */
1081 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1082                   struct obd_uuid *cluuid)
1083 {
1084         struct obd_export *export;
1085         LASSERT(conn != NULL);
1086         LASSERT(obd != NULL);
1087         LASSERT(cluuid != NULL);
1088         ENTRY;
1089
1090         export = class_new_export(obd, cluuid);
1091         if (IS_ERR(export))
1092                 RETURN(PTR_ERR(export));
1093
1094         conn->cookie = export->exp_handle.h_cookie;
1095         class_export_put(export);
1096
1097         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1098                cluuid->uuid, conn->cookie);
1099         RETURN(0);
1100 }
1101 EXPORT_SYMBOL(class_connect);
1102
1103 /* if export is involved in recovery then clean up related things */
1104 void class_export_recovery_cleanup(struct obd_export *exp)
1105 {
1106         struct obd_device *obd = exp->exp_obd;
1107
1108         cfs_spin_lock(&obd->obd_recovery_task_lock);
1109         if (exp->exp_delayed)
1110                 obd->obd_delayed_clients--;
1111         if (obd->obd_recovering && exp->exp_in_recovery) {
1112                 cfs_spin_lock(&exp->exp_lock);
1113                 exp->exp_in_recovery = 0;
1114                 cfs_spin_unlock(&exp->exp_lock);
1115                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1116                 cfs_atomic_dec(&obd->obd_connected_clients);
1117         }
1118         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1119         /** Cleanup req replay fields */
1120         if (exp->exp_req_replay_needed) {
1121                 cfs_spin_lock(&exp->exp_lock);
1122                 exp->exp_req_replay_needed = 0;
1123                 cfs_spin_unlock(&exp->exp_lock);
1124                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1125                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1126         }
1127         /** Cleanup lock replay data */
1128         if (exp->exp_lock_replay_needed) {
1129                 cfs_spin_lock(&exp->exp_lock);
1130                 exp->exp_lock_replay_needed = 0;
1131                 cfs_spin_unlock(&exp->exp_lock);
1132                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1133                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1134         }
1135 }
1136
1137 /* This function removes 1-3 references from the export:
1138  * 1 - for export pointer passed
1139  * and if disconnect really need
1140  * 2 - removing from hash
1141  * 3 - in client_unlink_export
1142  * The export pointer passed to this function can destroyed */
1143 int class_disconnect(struct obd_export *export)
1144 {
1145         int already_disconnected;
1146         ENTRY;
1147
1148         if (export == NULL) {
1149                 CWARN("attempting to free NULL export %p\n", export);
1150                 RETURN(-EINVAL);
1151         }
1152
1153         cfs_spin_lock(&export->exp_lock);
1154         already_disconnected = export->exp_disconnected;
1155         export->exp_disconnected = 1;
1156         cfs_spin_unlock(&export->exp_lock);
1157
1158         /* class_cleanup(), abort_recovery(), and class_fail_export()
1159          * all end up in here, and if any of them race we shouldn't
1160          * call extra class_export_puts(). */
1161         if (already_disconnected) {
1162                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1163                 GOTO(no_disconn, already_disconnected);
1164         }
1165
1166         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1167                export->exp_handle.h_cookie);
1168
1169         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1170                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1171                              &export->exp_connection->c_peer.nid,
1172                              &export->exp_nid_hash);
1173
1174         class_export_recovery_cleanup(export);
1175         class_unlink_export(export);
1176 no_disconn:
1177         class_export_put(export);
1178         RETURN(0);
1179 }
1180 EXPORT_SYMBOL(class_disconnect);
1181
1182 /* Return non-zero for a fully connected export */
1183 int class_connected_export(struct obd_export *exp)
1184 {
1185         if (exp) {
1186                 int connected;
1187                 cfs_spin_lock(&exp->exp_lock);
1188                 connected = (exp->exp_conn_cnt > 0);
1189                 cfs_spin_unlock(&exp->exp_lock);
1190                 return connected;
1191         }
1192         return 0;
1193 }
1194 EXPORT_SYMBOL(class_connected_export);
1195
1196 static void class_disconnect_export_list(cfs_list_t *list,
1197                                          enum obd_option flags)
1198 {
1199         int rc;
1200         struct obd_export *exp;
1201         ENTRY;
1202
1203         /* It's possible that an export may disconnect itself, but
1204          * nothing else will be added to this list. */
1205         while (!cfs_list_empty(list)) {
1206                 exp = cfs_list_entry(list->next, struct obd_export,
1207                                      exp_obd_chain);
1208                 /* need for safe call CDEBUG after obd_disconnect */
1209                 class_export_get(exp);
1210
1211                 cfs_spin_lock(&exp->exp_lock);
1212                 exp->exp_flags = flags;
1213                 cfs_spin_unlock(&exp->exp_lock);
1214
1215                 if (obd_uuid_equals(&exp->exp_client_uuid,
1216                                     &exp->exp_obd->obd_uuid)) {
1217                         CDEBUG(D_HA,
1218                                "exp %p export uuid == obd uuid, don't discon\n",
1219                                exp);
1220                         /* Need to delete this now so we don't end up pointing
1221                          * to work_list later when this export is cleaned up. */
1222                         cfs_list_del_init(&exp->exp_obd_chain);
1223                         class_export_put(exp);
1224                         continue;
1225                 }
1226
1227                 class_export_get(exp);
1228                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1229                        "last request at "CFS_TIME_T"\n",
1230                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1231                        exp, exp->exp_last_request_time);
1232                 /* release one export reference anyway */
1233                 rc = obd_disconnect(exp);
1234
1235                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1236                        obd_export_nid2str(exp), exp, rc);
1237                 class_export_put(exp);
1238         }
1239         EXIT;
1240 }
1241
1242 void class_disconnect_exports(struct obd_device *obd)
1243 {
1244         cfs_list_t work_list;
1245         ENTRY;
1246
1247         /* Move all of the exports from obd_exports to a work list, en masse. */
1248         CFS_INIT_LIST_HEAD(&work_list);
1249         cfs_spin_lock(&obd->obd_dev_lock);
1250         cfs_list_splice_init(&obd->obd_exports, &work_list);
1251         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1252         cfs_spin_unlock(&obd->obd_dev_lock);
1253
1254         if (!cfs_list_empty(&work_list)) {
1255                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1256                        "disconnecting them\n", obd->obd_minor, obd);
1257                 class_disconnect_export_list(&work_list,
1258                                              exp_flags_from_obd(obd));
1259         } else
1260                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1261                        obd->obd_minor, obd);
1262         EXIT;
1263 }
1264 EXPORT_SYMBOL(class_disconnect_exports);
1265
1266 /* Remove exports that have not completed recovery.
1267  */
1268 void class_disconnect_stale_exports(struct obd_device *obd,
1269                                     int (*test_export)(struct obd_export *))
1270 {
1271         cfs_list_t work_list;
1272         cfs_list_t *pos, *n;
1273         struct obd_export *exp;
1274         int evicted = 0;
1275         ENTRY;
1276
1277         CFS_INIT_LIST_HEAD(&work_list);
1278         cfs_spin_lock(&obd->obd_dev_lock);
1279         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1280                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1281                 if (test_export(exp))
1282                         continue;
1283
1284                 /* don't count self-export as client */
1285                 if (obd_uuid_equals(&exp->exp_client_uuid,
1286                                     &exp->exp_obd->obd_uuid))
1287                         continue;
1288
1289                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1290                 evicted++;
1291                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1292                        obd->obd_name, exp->exp_client_uuid.uuid,
1293                        exp->exp_connection == NULL ? "<unknown>" :
1294                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1295                 print_export_data(exp, "EVICTING", 0);
1296         }
1297         cfs_spin_unlock(&obd->obd_dev_lock);
1298
1299         if (evicted) {
1300                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1301                        obd->obd_name, evicted);
1302                 obd->obd_stale_clients += evicted;
1303         }
1304         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1305                                                  OBD_OPT_ABORT_RECOV);
1306         EXIT;
1307 }
1308 EXPORT_SYMBOL(class_disconnect_stale_exports);
1309
1310 void class_fail_export(struct obd_export *exp)
1311 {
1312         int rc, already_failed;
1313
1314         cfs_spin_lock(&exp->exp_lock);
1315         already_failed = exp->exp_failed;
1316         exp->exp_failed = 1;
1317         cfs_spin_unlock(&exp->exp_lock);
1318
1319         if (already_failed) {
1320                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1321                        exp, exp->exp_client_uuid.uuid);
1322                 return;
1323         }
1324
1325         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1326                exp, exp->exp_client_uuid.uuid);
1327
1328         if (obd_dump_on_timeout)
1329                 libcfs_debug_dumplog();
1330
1331         /* Most callers into obd_disconnect are removing their own reference
1332          * (request, for example) in addition to the one from the hash table.
1333          * We don't have such a reference here, so make one. */
1334         class_export_get(exp);
1335         rc = obd_disconnect(exp);
1336         if (rc)
1337                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1338         else
1339                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1340                        exp, exp->exp_client_uuid.uuid);
1341 }
1342 EXPORT_SYMBOL(class_fail_export);
1343
1344 char *obd_export_nid2str(struct obd_export *exp)
1345 {
1346         if (exp->exp_connection != NULL)
1347                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1348
1349         return "(no nid)";
1350 }
1351 EXPORT_SYMBOL(obd_export_nid2str);
1352
1353 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1354 {
1355         struct obd_export *doomed_exp = NULL;
1356         int exports_evicted = 0;
1357
1358         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1359
1360         do {
1361                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1362                 if (doomed_exp == NULL)
1363                         break;
1364
1365                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1366                          "nid %s found, wanted nid %s, requested nid %s\n",
1367                          obd_export_nid2str(doomed_exp),
1368                          libcfs_nid2str(nid_key), nid);
1369                 LASSERTF(doomed_exp != obd->obd_self_export,
1370                          "self-export is hashed by NID?\n");
1371                 exports_evicted++;
1372                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1373                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1374                        exports_evicted);
1375                 class_fail_export(doomed_exp);
1376                 class_export_put(doomed_exp);
1377         } while (1);
1378
1379         if (!exports_evicted)
1380                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1381                        obd->obd_name, nid);
1382         return exports_evicted;
1383 }
1384 EXPORT_SYMBOL(obd_export_evict_by_nid);
1385
1386 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1387 {
1388         struct obd_export *doomed_exp = NULL;
1389         struct obd_uuid doomed_uuid;
1390         int exports_evicted = 0;
1391
1392         obd_str2uuid(&doomed_uuid, uuid);
1393         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1394                 CERROR("%s: can't evict myself\n", obd->obd_name);
1395                 return exports_evicted;
1396         }
1397
1398         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1399
1400         if (doomed_exp == NULL) {
1401                 CERROR("%s: can't disconnect %s: no exports found\n",
1402                        obd->obd_name, uuid);
1403         } else {
1404                 CWARN("%s: evicting %s at adminstrative request\n",
1405                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1406                 class_fail_export(doomed_exp);
1407                 class_export_put(doomed_exp);
1408                 exports_evicted++;
1409         }
1410
1411         return exports_evicted;
1412 }
1413 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1414
1415 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1416 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1417 EXPORT_SYMBOL(class_export_dump_hook);
1418 #endif
1419
1420 static void print_export_data(struct obd_export *exp, const char *status,
1421                               int locks)
1422 {
1423         struct ptlrpc_reply_state *rs;
1424         struct ptlrpc_reply_state *first_reply = NULL;
1425         int nreplies = 0;
1426
1427         cfs_spin_lock(&exp->exp_lock);
1428         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1429                                 rs_exp_list) {
1430                 if (nreplies == 0)
1431                         first_reply = rs;
1432                 nreplies++;
1433         }
1434         cfs_spin_unlock(&exp->exp_lock);
1435
1436         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1437                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1438                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1439                cfs_atomic_read(&exp->exp_rpc_count),
1440                cfs_atomic_read(&exp->exp_cb_count),
1441                cfs_atomic_read(&exp->exp_locks_count),
1442                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1443                nreplies, first_reply, nreplies > 3 ? "..." : "",
1444                exp->exp_last_committed);
1445 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1446         if (locks && class_export_dump_hook != NULL)
1447                 class_export_dump_hook(exp);
1448 #endif
1449 }
1450
1451 void dump_exports(struct obd_device *obd, int locks)
1452 {
1453         struct obd_export *exp;
1454
1455         cfs_spin_lock(&obd->obd_dev_lock);
1456         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1457                 print_export_data(exp, "ACTIVE", locks);
1458         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1459                 print_export_data(exp, "UNLINKED", locks);
1460         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1461                 print_export_data(exp, "DELAYED", locks);
1462         cfs_spin_unlock(&obd->obd_dev_lock);
1463         cfs_spin_lock(&obd_zombie_impexp_lock);
1464         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1465                 print_export_data(exp, "ZOMBIE", locks);
1466         cfs_spin_unlock(&obd_zombie_impexp_lock);
1467 }
1468 EXPORT_SYMBOL(dump_exports);
1469
1470 void obd_exports_barrier(struct obd_device *obd)
1471 {
1472         int waited = 2;
1473         LASSERT(cfs_list_empty(&obd->obd_exports));
1474         cfs_spin_lock(&obd->obd_dev_lock);
1475         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1476                 cfs_spin_unlock(&obd->obd_dev_lock);
1477                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1478                                                    cfs_time_seconds(waited));
1479                 if (waited > 5 && IS_PO2(waited)) {
1480                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1481                                       "more than %d seconds. "
1482                                       "The obd refcount = %d. Is it stuck?\n",
1483                                       obd->obd_name, waited,
1484                                       cfs_atomic_read(&obd->obd_refcount));
1485                         dump_exports(obd, 1);
1486                 }
1487                 waited *= 2;
1488                 cfs_spin_lock(&obd->obd_dev_lock);
1489         }
1490         cfs_spin_unlock(&obd->obd_dev_lock);
1491 }
1492 EXPORT_SYMBOL(obd_exports_barrier);
1493
1494 /* Total amount of zombies to be destroyed */
1495 static int zombies_count = 0;
1496
1497 /**
1498  * kill zombie imports and exports
1499  */
1500 void obd_zombie_impexp_cull(void)
1501 {
1502         struct obd_import *import;
1503         struct obd_export *export;
1504         ENTRY;
1505
1506         do {
1507                 cfs_spin_lock(&obd_zombie_impexp_lock);
1508
1509                 import = NULL;
1510                 if (!cfs_list_empty(&obd_zombie_imports)) {
1511                         import = cfs_list_entry(obd_zombie_imports.next,
1512                                                 struct obd_import,
1513                                                 imp_zombie_chain);
1514                         cfs_list_del_init(&import->imp_zombie_chain);
1515                 }
1516
1517                 export = NULL;
1518                 if (!cfs_list_empty(&obd_zombie_exports)) {
1519                         export = cfs_list_entry(obd_zombie_exports.next,
1520                                                 struct obd_export,
1521                                                 exp_obd_chain);
1522                         cfs_list_del_init(&export->exp_obd_chain);
1523                 }
1524
1525                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1526
1527                 if (import != NULL) {
1528                         class_import_destroy(import);
1529                         cfs_spin_lock(&obd_zombie_impexp_lock);
1530                         zombies_count--;
1531                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1532                 }
1533
1534                 if (export != NULL) {
1535                         class_export_destroy(export);
1536                         cfs_spin_lock(&obd_zombie_impexp_lock);
1537                         zombies_count--;
1538                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1539                 }
1540
1541                 cfs_cond_resched();
1542         } while (import != NULL || export != NULL);
1543         EXIT;
1544 }
1545
1546 static cfs_completion_t         obd_zombie_start;
1547 static cfs_completion_t         obd_zombie_stop;
1548 static unsigned long            obd_zombie_flags;
1549 static cfs_waitq_t              obd_zombie_waitq;
1550 static pid_t                    obd_zombie_pid;
1551
1552 enum {
1553         OBD_ZOMBIE_STOP   = 1 << 1
1554 };
1555
1556 /**
1557  * check for work for kill zombie import/export thread.
1558  */
1559 static int obd_zombie_impexp_check(void *arg)
1560 {
1561         int rc;
1562
1563         cfs_spin_lock(&obd_zombie_impexp_lock);
1564         rc = (zombies_count == 0) &&
1565              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1566         cfs_spin_unlock(&obd_zombie_impexp_lock);
1567
1568         RETURN(rc);
1569 }
1570
1571 /**
1572  * Add export to the obd_zombe thread and notify it.
1573  */
1574 static void obd_zombie_export_add(struct obd_export *exp) {
1575         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1576         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1577         cfs_list_del_init(&exp->exp_obd_chain);
1578         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1579         cfs_spin_lock(&obd_zombie_impexp_lock);
1580         zombies_count++;
1581         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1582         cfs_spin_unlock(&obd_zombie_impexp_lock);
1583
1584         obd_zombie_impexp_notify();
1585 }
1586
1587 /**
1588  * Add import to the obd_zombe thread and notify it.
1589  */
1590 static void obd_zombie_import_add(struct obd_import *imp) {
1591         LASSERT(imp->imp_sec == NULL);
1592         cfs_spin_lock(&obd_zombie_impexp_lock);
1593         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1594         zombies_count++;
1595         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1596         cfs_spin_unlock(&obd_zombie_impexp_lock);
1597
1598         obd_zombie_impexp_notify();
1599 }
1600
1601 /**
1602  * notify import/export destroy thread about new zombie.
1603  */
1604 static void obd_zombie_impexp_notify(void)
1605 {
1606         /*
1607          * Make sure obd_zomebie_impexp_thread get this notification.
1608          * It is possible this signal only get by obd_zombie_barrier, and
1609          * barrier gulps this notification and sleeps away and hangs ensues
1610          */
1611         cfs_waitq_broadcast(&obd_zombie_waitq);
1612 }
1613
1614 /**
1615  * check whether obd_zombie is idle
1616  */
1617 static int obd_zombie_is_idle(void)
1618 {
1619         int rc;
1620
1621         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1622         cfs_spin_lock(&obd_zombie_impexp_lock);
1623         rc = (zombies_count == 0);
1624         cfs_spin_unlock(&obd_zombie_impexp_lock);
1625         return rc;
1626 }
1627
1628 /**
1629  * wait when obd_zombie import/export queues become empty
1630  */
1631 void obd_zombie_barrier(void)
1632 {
1633         struct l_wait_info lwi = { 0 };
1634
1635         if (obd_zombie_pid == cfs_curproc_pid())
1636                 /* don't wait for myself */
1637                 return;
1638         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1639 }
1640 EXPORT_SYMBOL(obd_zombie_barrier);
1641
1642 #ifdef __KERNEL__
1643
1644 /**
1645  * destroy zombie export/import thread.
1646  */
1647 static int obd_zombie_impexp_thread(void *unused)
1648 {
1649         int rc;
1650
1651         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1652                 cfs_complete(&obd_zombie_start);
1653                 RETURN(rc);
1654         }
1655
1656         cfs_complete(&obd_zombie_start);
1657
1658         obd_zombie_pid = cfs_curproc_pid();
1659
1660         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1661                 struct l_wait_info lwi = { 0 };
1662
1663                 l_wait_event(obd_zombie_waitq,
1664                              !obd_zombie_impexp_check(NULL), &lwi);
1665                 obd_zombie_impexp_cull();
1666
1667                 /*
1668                  * Notify obd_zombie_barrier callers that queues
1669                  * may be empty.
1670                  */
1671                 cfs_waitq_signal(&obd_zombie_waitq);
1672         }
1673
1674         cfs_complete(&obd_zombie_stop);
1675
1676         RETURN(0);
1677 }
1678
1679 #else /* ! KERNEL */
1680
1681 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1682 static void *obd_zombie_impexp_work_cb;
1683 static void *obd_zombie_impexp_idle_cb;
1684
1685 int obd_zombie_impexp_kill(void *arg)
1686 {
1687         int rc = 0;
1688
1689         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1690                 obd_zombie_impexp_cull();
1691                 rc = 1;
1692         }
1693         cfs_atomic_dec(&zombie_recur);
1694         return rc;
1695 }
1696
1697 #endif
1698
1699 /**
1700  * start destroy zombie import/export thread
1701  */
1702 int obd_zombie_impexp_init(void)
1703 {
1704         int rc;
1705
1706         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1707         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1708         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1709         cfs_init_completion(&obd_zombie_start);
1710         cfs_init_completion(&obd_zombie_stop);
1711         cfs_waitq_init(&obd_zombie_waitq);
1712         obd_zombie_pid = 0;
1713
1714 #ifdef __KERNEL__
1715         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1716         if (rc < 0)
1717                 RETURN(rc);
1718
1719         cfs_wait_for_completion(&obd_zombie_start);
1720 #else
1721
1722         obd_zombie_impexp_work_cb =
1723                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1724                                                  &obd_zombie_impexp_kill, NULL);
1725
1726         obd_zombie_impexp_idle_cb =
1727                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1728                                                  &obd_zombie_impexp_check, NULL);
1729         rc = 0;
1730 #endif
1731         RETURN(rc);
1732 }
1733 /**
1734  * stop destroy zombie import/export thread
1735  */
1736 void obd_zombie_impexp_stop(void)
1737 {
1738         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1739         obd_zombie_impexp_notify();
1740 #ifdef __KERNEL__
1741         cfs_wait_for_completion(&obd_zombie_stop);
1742 #else
1743         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1744         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1745 #endif
1746 }
1747
1748 /***** Kernel-userspace comm helpers *******/
1749
1750 /* Get length of entire message, including header */
1751 int kuc_len(int payload_len)
1752 {
1753         return sizeof(struct kuc_hdr) + payload_len;
1754 }
1755 EXPORT_SYMBOL(kuc_len);
1756
1757 /* Get a pointer to kuc header, given a ptr to the payload
1758  * @param p Pointer to payload area
1759  * @returns Pointer to kuc header
1760  */
1761 struct kuc_hdr * kuc_ptr(void *p)
1762 {
1763         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1764         LASSERT(lh->kuc_magic == KUC_MAGIC);
1765         return lh;
1766 }
1767 EXPORT_SYMBOL(kuc_ptr);
1768
1769 /* Test if payload is part of kuc message
1770  * @param p Pointer to payload area
1771  * @returns boolean
1772  */
1773 int kuc_ispayload(void *p)
1774 {
1775         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1776
1777         if (kh->kuc_magic == KUC_MAGIC)
1778                 return 1;
1779         else
1780                 return 0;
1781 }
1782 EXPORT_SYMBOL(kuc_ispayload);
1783
1784 /* Alloc space for a message, and fill in header
1785  * @return Pointer to payload area
1786  */
1787 void *kuc_alloc(int payload_len, int transport, int type)
1788 {
1789         struct kuc_hdr *lh;
1790         int len = kuc_len(payload_len);
1791
1792         OBD_ALLOC(lh, len);
1793         if (lh == NULL)
1794                 return ERR_PTR(-ENOMEM);
1795
1796         lh->kuc_magic = KUC_MAGIC;
1797         lh->kuc_transport = transport;
1798         lh->kuc_msgtype = type;
1799         lh->kuc_msglen = len;
1800
1801         return (void *)(lh + 1);
1802 }
1803 EXPORT_SYMBOL(kuc_alloc);
1804
1805 /* Takes pointer to payload area */
1806 inline void kuc_free(void *p, int payload_len)
1807 {
1808         struct kuc_hdr *lh = kuc_ptr(p);
1809         OBD_FREE(lh, kuc_len(payload_len));
1810 }
1811 EXPORT_SYMBOL(kuc_free);
1812
1813
1814