Whamcloud - gitweb
LU-1247 obdfilter: fix invalid check of precrate objects
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011 Whamcloud, Inc.
33  *
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lustre/obdclass/genops.c
40  *
41  * These are the only exported functions, they provide some generic
42  * infrastructure for managing object devices
43  */
44
45 #define DEBUG_SUBSYSTEM S_CLASS
46 #ifndef __KERNEL__
47 #include <liblustre.h>
48 #endif
49 #include <obd_ost.h>
50 #include <obd_class.h>
51 #include <lprocfs_status.h>
52
53 extern cfs_list_t obd_types;
54 cfs_spinlock_t obd_types_lock;
55
56 cfs_mem_cache_t *obd_device_cachep;
57 cfs_mem_cache_t *obdo_cachep;
58 EXPORT_SYMBOL(obdo_cachep);
59 cfs_mem_cache_t *import_cachep;
60
61 cfs_list_t      obd_zombie_imports;
62 cfs_list_t      obd_zombie_exports;
63 cfs_spinlock_t  obd_zombie_impexp_lock;
64 static void obd_zombie_impexp_notify(void);
65 static void obd_zombie_export_add(struct obd_export *exp);
66 static void obd_zombie_import_add(struct obd_import *imp);
67 static void print_export_data(struct obd_export *exp,
68                               const char *status, int locks);
69
70 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
71
72 /*
73  * support functions: we could use inter-module communication, but this
74  * is more portable to other OS's
75  */
76 static struct obd_device *obd_device_alloc(void)
77 {
78         struct obd_device *obd;
79
80         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
81         if (obd != NULL) {
82                 obd->obd_magic = OBD_DEVICE_MAGIC;
83         }
84         return obd;
85 }
86
87 static void obd_device_free(struct obd_device *obd)
88 {
89         LASSERT(obd != NULL);
90         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
91                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
92         if (obd->obd_namespace != NULL) {
93                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
94                        obd, obd->obd_namespace, obd->obd_force);
95                 LBUG();
96         }
97         lu_ref_fini(&obd->obd_reference);
98         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
99 }
100
101 struct obd_type *class_search_type(const char *name)
102 {
103         cfs_list_t *tmp;
104         struct obd_type *type;
105
106         cfs_spin_lock(&obd_types_lock);
107         cfs_list_for_each(tmp, &obd_types) {
108                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
109                 if (strcmp(type->typ_name, name) == 0) {
110                         cfs_spin_unlock(&obd_types_lock);
111                         return type;
112                 }
113         }
114         cfs_spin_unlock(&obd_types_lock);
115         return NULL;
116 }
117
118 struct obd_type *class_get_type(const char *name)
119 {
120         struct obd_type *type = class_search_type(name);
121
122 #ifdef HAVE_MODULE_LOADING_SUPPORT
123         if (!type) {
124                 const char *modname = name;
125                 if (!cfs_request_module("%s", modname)) {
126                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
127                         type = class_search_type(name);
128                 } else {
129                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
130                                            modname);
131                 }
132         }
133 #endif
134         if (type) {
135                 cfs_spin_lock(&type->obd_type_lock);
136                 type->typ_refcnt++;
137                 cfs_try_module_get(type->typ_dt_ops->o_owner);
138                 cfs_spin_unlock(&type->obd_type_lock);
139         }
140         return type;
141 }
142
143 void class_put_type(struct obd_type *type)
144 {
145         LASSERT(type);
146         cfs_spin_lock(&type->obd_type_lock);
147         type->typ_refcnt--;
148         cfs_module_put(type->typ_dt_ops->o_owner);
149         cfs_spin_unlock(&type->obd_type_lock);
150 }
151
152 #define CLASS_MAX_NAME 1024
153
154 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
155                         struct lprocfs_vars *vars, const char *name,
156                         struct lu_device_type *ldt)
157 {
158         struct obd_type *type;
159         int rc = 0;
160         ENTRY;
161
162         /* sanity check */
163         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
164
165         if (class_search_type(name)) {
166                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
167                 RETURN(-EEXIST);
168         }
169
170         rc = -ENOMEM;
171         OBD_ALLOC(type, sizeof(*type));
172         if (type == NULL)
173                 RETURN(rc);
174
175         OBD_ALLOC_PTR(type->typ_dt_ops);
176         OBD_ALLOC_PTR(type->typ_md_ops);
177         OBD_ALLOC(type->typ_name, strlen(name) + 1);
178
179         if (type->typ_dt_ops == NULL ||
180             type->typ_md_ops == NULL ||
181             type->typ_name == NULL)
182                 GOTO (failed, rc);
183
184         *(type->typ_dt_ops) = *dt_ops;
185         /* md_ops is optional */
186         if (md_ops)
187                 *(type->typ_md_ops) = *md_ops;
188         strcpy(type->typ_name, name);
189         cfs_spin_lock_init(&type->obd_type_lock);
190
191 #ifdef LPROCFS
192         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
193                                               vars, type);
194         if (IS_ERR(type->typ_procroot)) {
195                 rc = PTR_ERR(type->typ_procroot);
196                 type->typ_procroot = NULL;
197                 GOTO (failed, rc);
198         }
199 #endif
200         if (ldt != NULL) {
201                 type->typ_lu = ldt;
202                 rc = lu_device_type_init(ldt);
203                 if (rc != 0)
204                         GOTO (failed, rc);
205         }
206
207         cfs_spin_lock(&obd_types_lock);
208         cfs_list_add(&type->typ_chain, &obd_types);
209         cfs_spin_unlock(&obd_types_lock);
210
211         RETURN (0);
212
213  failed:
214         if (type->typ_name != NULL)
215                 OBD_FREE(type->typ_name, strlen(name) + 1);
216         if (type->typ_md_ops != NULL)
217                 OBD_FREE_PTR(type->typ_md_ops);
218         if (type->typ_dt_ops != NULL)
219                 OBD_FREE_PTR(type->typ_dt_ops);
220         OBD_FREE(type, sizeof(*type));
221         RETURN(rc);
222 }
223
224 int class_unregister_type(const char *name)
225 {
226         struct obd_type *type = class_search_type(name);
227         ENTRY;
228
229         if (!type) {
230                 CERROR("unknown obd type\n");
231                 RETURN(-EINVAL);
232         }
233
234         if (type->typ_refcnt) {
235                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
236                 /* This is a bad situation, let's make the best of it */
237                 /* Remove ops, but leave the name for debugging */
238                 OBD_FREE_PTR(type->typ_dt_ops);
239                 OBD_FREE_PTR(type->typ_md_ops);
240                 RETURN(-EBUSY);
241         }
242
243         if (type->typ_procroot) {
244                 lprocfs_remove(&type->typ_procroot);
245         }
246
247         if (type->typ_lu)
248                 lu_device_type_fini(type->typ_lu);
249
250         cfs_spin_lock(&obd_types_lock);
251         cfs_list_del(&type->typ_chain);
252         cfs_spin_unlock(&obd_types_lock);
253         OBD_FREE(type->typ_name, strlen(name) + 1);
254         if (type->typ_dt_ops != NULL)
255                 OBD_FREE_PTR(type->typ_dt_ops);
256         if (type->typ_md_ops != NULL)
257                 OBD_FREE_PTR(type->typ_md_ops);
258         OBD_FREE(type, sizeof(*type));
259         RETURN(0);
260 } /* class_unregister_type */
261
262 /**
263  * Create a new obd device.
264  *
265  * Find an empty slot in ::obd_devs[], create a new obd device in it.
266  *
267  * \param[in] type_name obd device type string.
268  * \param[in] name      obd device name.
269  *
270  * \retval NULL if create fails, otherwise return the obd device
271  *         pointer created.
272  */
273 struct obd_device *class_newdev(const char *type_name, const char *name)
274 {
275         struct obd_device *result = NULL;
276         struct obd_device *newdev;
277         struct obd_type *type = NULL;
278         int i;
279         int new_obd_minor = 0;
280
281         if (strlen(name) >= MAX_OBD_NAME) {
282                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
283                 RETURN(ERR_PTR(-EINVAL));
284         }
285
286         type = class_get_type(type_name);
287         if (type == NULL){
288                 CERROR("OBD: unknown type: %s\n", type_name);
289                 RETURN(ERR_PTR(-ENODEV));
290         }
291
292         newdev = obd_device_alloc();
293         if (newdev == NULL) {
294                 class_put_type(type);
295                 RETURN(ERR_PTR(-ENOMEM));
296         }
297         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
298
299         cfs_spin_lock(&obd_dev_lock);
300         for (i = 0; i < class_devno_max(); i++) {
301                 struct obd_device *obd = class_num2obd(i);
302                 if (obd && obd->obd_name &&
303                     (strcmp(name, obd->obd_name) == 0)) {
304                         CERROR("Device %s already exists, won't add\n", name);
305                         if (result) {
306                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
307                                          "%p obd_magic %08x != %08x\n", result,
308                                          result->obd_magic, OBD_DEVICE_MAGIC);
309                                 LASSERTF(result->obd_minor == new_obd_minor,
310                                          "%p obd_minor %d != %d\n", result,
311                                          result->obd_minor, new_obd_minor);
312
313                                 obd_devs[result->obd_minor] = NULL;
314                                 result->obd_name[0]='\0';
315                          }
316                         result = ERR_PTR(-EEXIST);
317                         break;
318                 }
319                 if (!result && !obd) {
320                         result = newdev;
321                         result->obd_minor = i;
322                         new_obd_minor = i;
323                         result->obd_type = type;
324                         strncpy(result->obd_name, name,
325                                 sizeof(result->obd_name) - 1);
326                         obd_devs[i] = result;
327                 }
328         }
329         cfs_spin_unlock(&obd_dev_lock);
330
331         if (result == NULL && i >= class_devno_max()) {
332                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
333                        class_devno_max());
334                 result = ERR_PTR(-EOVERFLOW);
335         }
336
337         if (IS_ERR(result)) {
338                 obd_device_free(newdev);
339                 class_put_type(type);
340         } else {
341                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
342                        result->obd_name, result);
343         }
344         return result;
345 }
346
347 void class_release_dev(struct obd_device *obd)
348 {
349         struct obd_type *obd_type = obd->obd_type;
350
351         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
352                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
353         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
354                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
355         LASSERT(obd_type != NULL);
356
357         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
358                obd->obd_name,obd->obd_type->typ_name);
359
360         cfs_spin_lock(&obd_dev_lock);
361         obd_devs[obd->obd_minor] = NULL;
362         cfs_spin_unlock(&obd_dev_lock);
363         obd_device_free(obd);
364
365         class_put_type(obd_type);
366 }
367
368 int class_name2dev(const char *name)
369 {
370         int i;
371
372         if (!name)
373                 return -1;
374
375         cfs_spin_lock(&obd_dev_lock);
376         for (i = 0; i < class_devno_max(); i++) {
377                 struct obd_device *obd = class_num2obd(i);
378                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
379                         /* Make sure we finished attaching before we give
380                            out any references */
381                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
382                         if (obd->obd_attached) {
383                                 cfs_spin_unlock(&obd_dev_lock);
384                                 return i;
385                         }
386                         break;
387                 }
388         }
389         cfs_spin_unlock(&obd_dev_lock);
390
391         return -1;
392 }
393
394 struct obd_device *class_name2obd(const char *name)
395 {
396         int dev = class_name2dev(name);
397
398         if (dev < 0 || dev > class_devno_max())
399                 return NULL;
400         return class_num2obd(dev);
401 }
402
403 int class_uuid2dev(struct obd_uuid *uuid)
404 {
405         int i;
406
407         cfs_spin_lock(&obd_dev_lock);
408         for (i = 0; i < class_devno_max(); i++) {
409                 struct obd_device *obd = class_num2obd(i);
410                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
411                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
412                         cfs_spin_unlock(&obd_dev_lock);
413                         return i;
414                 }
415         }
416         cfs_spin_unlock(&obd_dev_lock);
417
418         return -1;
419 }
420
421 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
422 {
423         int dev = class_uuid2dev(uuid);
424         if (dev < 0)
425                 return NULL;
426         return class_num2obd(dev);
427 }
428
429 /**
430  * Get obd device from ::obd_devs[]
431  *
432  * \param num [in] array index
433  *
434  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
435  *         otherwise return the obd device there.
436  */
437 struct obd_device *class_num2obd(int num)
438 {
439         struct obd_device *obd = NULL;
440
441         if (num < class_devno_max()) {
442                 obd = obd_devs[num];
443                 if (obd == NULL)
444                         return NULL;
445
446                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
447                          "%p obd_magic %08x != %08x\n",
448                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
449                 LASSERTF(obd->obd_minor == num,
450                          "%p obd_minor %0d != %0d\n",
451                          obd, obd->obd_minor, num);
452         }
453
454         return obd;
455 }
456
457 void class_obd_list(void)
458 {
459         char *status;
460         int i;
461
462         cfs_spin_lock(&obd_dev_lock);
463         for (i = 0; i < class_devno_max(); i++) {
464                 struct obd_device *obd = class_num2obd(i);
465                 if (obd == NULL)
466                         continue;
467                 if (obd->obd_stopping)
468                         status = "ST";
469                 else if (obd->obd_set_up)
470                         status = "UP";
471                 else if (obd->obd_attached)
472                         status = "AT";
473                 else
474                         status = "--";
475                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
476                          i, status, obd->obd_type->typ_name,
477                          obd->obd_name, obd->obd_uuid.uuid,
478                          cfs_atomic_read(&obd->obd_refcount));
479         }
480         cfs_spin_unlock(&obd_dev_lock);
481         return;
482 }
483
484 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
485    specified, then only the client with that uuid is returned,
486    otherwise any client connected to the tgt is returned. */
487 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
488                                           const char * typ_name,
489                                           struct obd_uuid *grp_uuid)
490 {
491         int i;
492
493         cfs_spin_lock(&obd_dev_lock);
494         for (i = 0; i < class_devno_max(); i++) {
495                 struct obd_device *obd = class_num2obd(i);
496                 if (obd == NULL)
497                         continue;
498                 if ((strncmp(obd->obd_type->typ_name, typ_name,
499                              strlen(typ_name)) == 0)) {
500                         if (obd_uuid_equals(tgt_uuid,
501                                             &obd->u.cli.cl_target_uuid) &&
502                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
503                                                          &obd->obd_uuid) : 1)) {
504                                 cfs_spin_unlock(&obd_dev_lock);
505                                 return obd;
506                         }
507                 }
508         }
509         cfs_spin_unlock(&obd_dev_lock);
510
511         return NULL;
512 }
513
514 /* Iterate the obd_device list looking devices have grp_uuid. Start
515    searching at *next, and if a device is found, the next index to look
516    at is saved in *next. If next is NULL, then the first matching device
517    will always be returned. */
518 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
519 {
520         int i;
521
522         if (next == NULL)
523                 i = 0;
524         else if (*next >= 0 && *next < class_devno_max())
525                 i = *next;
526         else
527                 return NULL;
528
529         cfs_spin_lock(&obd_dev_lock);
530         for (; i < class_devno_max(); i++) {
531                 struct obd_device *obd = class_num2obd(i);
532                 if (obd == NULL)
533                         continue;
534                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
535                         if (next != NULL)
536                                 *next = i+1;
537                         cfs_spin_unlock(&obd_dev_lock);
538                         return obd;
539                 }
540         }
541         cfs_spin_unlock(&obd_dev_lock);
542
543         return NULL;
544 }
545
546 /**
547  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
548  * adjust sptlrpc settings accordingly.
549  */
550 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
551 {
552         struct obd_device  *obd;
553         const char         *type;
554         int                 i, rc = 0, rc2;
555
556         LASSERT(namelen > 0);
557
558         cfs_spin_lock(&obd_dev_lock);
559         for (i = 0; i < class_devno_max(); i++) {
560                 obd = class_num2obd(i);
561
562                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
563                         continue;
564
565                 /* only notify mdc, osc, mdt, ost */
566                 type = obd->obd_type->typ_name;
567                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
568                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
569                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
570                     strcmp(type, LUSTRE_OST_NAME) != 0)
571                         continue;
572
573                 if (strncmp(obd->obd_name, fsname, namelen))
574                         continue;
575
576                 class_incref(obd, __FUNCTION__, obd);
577                 cfs_spin_unlock(&obd_dev_lock);
578                 rc2 = obd_set_info_async(obd->obd_self_export,
579                                          sizeof(KEY_SPTLRPC_CONF),
580                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
581                 rc = rc ? rc : rc2;
582                 class_decref(obd, __FUNCTION__, obd);
583                 cfs_spin_lock(&obd_dev_lock);
584         }
585         cfs_spin_unlock(&obd_dev_lock);
586         return rc;
587 }
588 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
589
590 void obd_cleanup_caches(void)
591 {
592         int rc;
593
594         ENTRY;
595         if (obd_device_cachep) {
596                 rc = cfs_mem_cache_destroy(obd_device_cachep);
597                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
598                 obd_device_cachep = NULL;
599         }
600         if (obdo_cachep) {
601                 rc = cfs_mem_cache_destroy(obdo_cachep);
602                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
603                 obdo_cachep = NULL;
604         }
605         if (import_cachep) {
606                 rc = cfs_mem_cache_destroy(import_cachep);
607                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
608                 import_cachep = NULL;
609         }
610         if (capa_cachep) {
611                 rc = cfs_mem_cache_destroy(capa_cachep);
612                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
613                 capa_cachep = NULL;
614         }
615         EXIT;
616 }
617
618 int obd_init_caches(void)
619 {
620         ENTRY;
621
622         LASSERT(obd_device_cachep == NULL);
623         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
624                                                  sizeof(struct obd_device),
625                                                  0, 0);
626         if (!obd_device_cachep)
627                 GOTO(out, -ENOMEM);
628
629         LASSERT(obdo_cachep == NULL);
630         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
631                                            0, 0);
632         if (!obdo_cachep)
633                 GOTO(out, -ENOMEM);
634
635         LASSERT(import_cachep == NULL);
636         import_cachep = cfs_mem_cache_create("ll_import_cache",
637                                              sizeof(struct obd_import),
638                                              0, 0);
639         if (!import_cachep)
640                 GOTO(out, -ENOMEM);
641
642         LASSERT(capa_cachep == NULL);
643         capa_cachep = cfs_mem_cache_create("capa_cache",
644                                            sizeof(struct obd_capa), 0, 0);
645         if (!capa_cachep)
646                 GOTO(out, -ENOMEM);
647
648         RETURN(0);
649  out:
650         obd_cleanup_caches();
651         RETURN(-ENOMEM);
652
653 }
654
655 /* map connection to client */
656 struct obd_export *class_conn2export(struct lustre_handle *conn)
657 {
658         struct obd_export *export;
659         ENTRY;
660
661         if (!conn) {
662                 CDEBUG(D_CACHE, "looking for null handle\n");
663                 RETURN(NULL);
664         }
665
666         if (conn->cookie == -1) {  /* this means assign a new connection */
667                 CDEBUG(D_CACHE, "want a new connection\n");
668                 RETURN(NULL);
669         }
670
671         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
672         export = class_handle2object(conn->cookie);
673         RETURN(export);
674 }
675
676 struct obd_device *class_exp2obd(struct obd_export *exp)
677 {
678         if (exp)
679                 return exp->exp_obd;
680         return NULL;
681 }
682
683 struct obd_device *class_conn2obd(struct lustre_handle *conn)
684 {
685         struct obd_export *export;
686         export = class_conn2export(conn);
687         if (export) {
688                 struct obd_device *obd = export->exp_obd;
689                 class_export_put(export);
690                 return obd;
691         }
692         return NULL;
693 }
694
695 struct obd_import *class_exp2cliimp(struct obd_export *exp)
696 {
697         struct obd_device *obd = exp->exp_obd;
698         if (obd == NULL)
699                 return NULL;
700         return obd->u.cli.cl_import;
701 }
702
703 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
704 {
705         struct obd_device *obd = class_conn2obd(conn);
706         if (obd == NULL)
707                 return NULL;
708         return obd->u.cli.cl_import;
709 }
710
711 /* Export management functions */
712
713 /* if export is involved in recovery then clean up related things */
714 void class_export_recovery_cleanup(struct obd_export *exp)
715 {
716         struct obd_device *obd = exp->exp_obd;
717
718         cfs_spin_lock(&obd->obd_recovery_task_lock);
719         if (exp->exp_delayed)
720                 obd->obd_delayed_clients--;
721         if (obd->obd_recovering && exp->exp_in_recovery) {
722                 cfs_spin_lock(&exp->exp_lock);
723                 exp->exp_in_recovery = 0;
724                 cfs_spin_unlock(&exp->exp_lock);
725                 LASSERT(obd->obd_connected_clients);
726                 obd->obd_connected_clients--;
727         }
728         cfs_spin_unlock(&obd->obd_recovery_task_lock);
729         /** Cleanup req replay fields */
730         if (exp->exp_req_replay_needed) {
731                 cfs_spin_lock(&exp->exp_lock);
732                 exp->exp_req_replay_needed = 0;
733                 cfs_spin_unlock(&exp->exp_lock);
734                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
735                 cfs_atomic_dec(&obd->obd_req_replay_clients);
736         }
737         /** Cleanup lock replay data */
738         if (exp->exp_lock_replay_needed) {
739                 cfs_spin_lock(&exp->exp_lock);
740                 exp->exp_lock_replay_needed = 0;
741                 cfs_spin_unlock(&exp->exp_lock);
742                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
743                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
744         }
745 }
746
747 static void class_export_destroy(struct obd_export *exp)
748 {
749         struct obd_device *obd = exp->exp_obd;
750         ENTRY;
751
752         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
753         LASSERT(obd != NULL);
754
755         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
756                exp->exp_client_uuid.uuid, obd->obd_name);
757
758
759         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
760         if (exp->exp_connection)
761                 ptlrpc_put_connection_superhack(exp->exp_connection);
762
763         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
764         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
765         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
766         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
767         obd_destroy_export(exp);
768         class_decref(obd, "export", exp);
769
770         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
771         EXIT;
772 }
773
774 static void export_handle_addref(void *export)
775 {
776         class_export_get(export);
777 }
778
779 struct obd_export *class_export_get(struct obd_export *exp)
780 {
781         cfs_atomic_inc(&exp->exp_refcount);
782         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
783                cfs_atomic_read(&exp->exp_refcount));
784         return exp;
785 }
786 EXPORT_SYMBOL(class_export_get);
787
788 void class_export_put(struct obd_export *exp)
789 {
790         LASSERT(exp != NULL);
791         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
792         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
793                cfs_atomic_read(&exp->exp_refcount) - 1);
794
795         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
796                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
797                 CDEBUG(D_IOCTL, "final put %p/%s\n",
798                        exp, exp->exp_client_uuid.uuid);
799
800                 /* release nid stat refererence */
801                 lprocfs_exp_cleanup(exp);
802                 class_export_recovery_cleanup(exp);
803
804                 obd_zombie_export_add(exp);
805         }
806 }
807 EXPORT_SYMBOL(class_export_put);
808
809 /* Creates a new export, adds it to the hash table, and returns a
810  * pointer to it. The refcount is 2: one for the hash reference, and
811  * one for the pointer returned by this function. */
812 struct obd_export *class_new_export(struct obd_device *obd,
813                                     struct obd_uuid *cluuid)
814 {
815         struct obd_export *export;
816         cfs_hash_t *hash = NULL;
817         int rc = 0;
818         ENTRY;
819
820         OBD_ALLOC_PTR(export);
821         if (!export)
822                 return ERR_PTR(-ENOMEM);
823
824         export->exp_conn_cnt = 0;
825         export->exp_lock_hash = NULL;
826         cfs_atomic_set(&export->exp_refcount, 2);
827         cfs_atomic_set(&export->exp_rpc_count, 0);
828         cfs_atomic_set(&export->exp_cb_count, 0);
829         cfs_atomic_set(&export->exp_locks_count, 0);
830 #if LUSTRE_TRACKS_LOCK_EXP_REFS
831         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
832         cfs_spin_lock_init(&export->exp_locks_list_guard);
833 #endif
834         cfs_atomic_set(&export->exp_replay_count, 0);
835         export->exp_obd = obd;
836         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
837         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
838         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
839         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
840         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
841         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
842         class_handle_hash(&export->exp_handle, export_handle_addref);
843         export->exp_last_request_time = cfs_time_current_sec();
844         cfs_spin_lock_init(&export->exp_lock);
845         cfs_spin_lock_init(&export->exp_rpc_lock);
846         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
847         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
848         cfs_spin_lock_init(&export->exp_bl_list_lock);
849         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
850
851         export->exp_sp_peer = LUSTRE_SP_ANY;
852         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
853         export->exp_client_uuid = *cluuid;
854         obd_init_export(export);
855
856         cfs_spin_lock(&obd->obd_dev_lock);
857          /* shouldn't happen, but might race */
858         if (obd->obd_stopping)
859                 GOTO(exit_unlock, rc = -ENODEV);
860
861         hash = cfs_hash_getref(obd->obd_uuid_hash);
862         if (hash == NULL)
863                 GOTO(exit_unlock, rc = -ENODEV);
864         cfs_spin_unlock(&obd->obd_dev_lock);
865
866         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
867                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
868                 if (rc != 0) {
869                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
870                                       obd->obd_name, cluuid->uuid, rc);
871                         GOTO(exit_err, rc = -EALREADY);
872                 }
873         }
874
875         cfs_spin_lock(&obd->obd_dev_lock);
876         if (obd->obd_stopping) {
877                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
878                 GOTO(exit_unlock, rc = -ENODEV);
879         }
880
881         class_incref(obd, "export", export);
882         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
883         cfs_list_add_tail(&export->exp_obd_chain_timed,
884                           &export->exp_obd->obd_exports_timed);
885         export->exp_obd->obd_num_exports++;
886         cfs_spin_unlock(&obd->obd_dev_lock);
887         cfs_hash_putref(hash);
888         RETURN(export);
889
890 exit_unlock:
891         cfs_spin_unlock(&obd->obd_dev_lock);
892 exit_err:
893         if (hash)
894                 cfs_hash_putref(hash);
895         class_handle_unhash(&export->exp_handle);
896         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
897         obd_destroy_export(export);
898         OBD_FREE_PTR(export);
899         return ERR_PTR(rc);
900 }
901 EXPORT_SYMBOL(class_new_export);
902
903 void class_unlink_export(struct obd_export *exp)
904 {
905         class_handle_unhash(&exp->exp_handle);
906
907         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
908         /* delete an uuid-export hashitem from hashtables */
909         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
910                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
911                              &exp->exp_client_uuid,
912                              &exp->exp_uuid_hash);
913
914         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
915         cfs_list_del_init(&exp->exp_obd_chain_timed);
916         exp->exp_obd->obd_num_exports--;
917         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
918         class_export_put(exp);
919 }
920 EXPORT_SYMBOL(class_unlink_export);
921
922 /* Import management functions */
923 void class_import_destroy(struct obd_import *imp)
924 {
925         ENTRY;
926
927         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
928                 imp->imp_obd->obd_name);
929
930         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
931
932         ptlrpc_put_connection_superhack(imp->imp_connection);
933
934         while (!cfs_list_empty(&imp->imp_conn_list)) {
935                 struct obd_import_conn *imp_conn;
936
937                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
938                                           struct obd_import_conn, oic_item);
939                 cfs_list_del_init(&imp_conn->oic_item);
940                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
941                 OBD_FREE(imp_conn, sizeof(*imp_conn));
942         }
943
944         LASSERT(imp->imp_sec == NULL);
945         class_decref(imp->imp_obd, "import", imp);
946         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
947         EXIT;
948 }
949
950 static void import_handle_addref(void *import)
951 {
952         class_import_get(import);
953 }
954
955 struct obd_import *class_import_get(struct obd_import *import)
956 {
957         cfs_atomic_inc(&import->imp_refcount);
958         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
959                cfs_atomic_read(&import->imp_refcount),
960                import->imp_obd->obd_name);
961         return import;
962 }
963 EXPORT_SYMBOL(class_import_get);
964
965 void class_import_put(struct obd_import *imp)
966 {
967         ENTRY;
968
969         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
970         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
971
972         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
973                cfs_atomic_read(&imp->imp_refcount) - 1,
974                imp->imp_obd->obd_name);
975
976         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
977                 CDEBUG(D_INFO, "final put import %p\n", imp);
978                 obd_zombie_import_add(imp);
979         }
980
981         EXIT;
982 }
983 EXPORT_SYMBOL(class_import_put);
984
985 static void init_imp_at(struct imp_at *at) {
986         int i;
987         at_init(&at->iat_net_latency, 0, 0);
988         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
989                 /* max service estimates are tracked on the server side, so
990                    don't use the AT history here, just use the last reported
991                    val. (But keep hist for proc histogram, worst_ever) */
992                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
993                         AT_FLG_NOHIST);
994         }
995 }
996
997 struct obd_import *class_new_import(struct obd_device *obd)
998 {
999         struct obd_import *imp;
1000
1001         OBD_ALLOC(imp, sizeof(*imp));
1002         if (imp == NULL)
1003                 return NULL;
1004
1005         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1006         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1007         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1008         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1009         cfs_spin_lock_init(&imp->imp_lock);
1010         imp->imp_last_success_conn = 0;
1011         imp->imp_state = LUSTRE_IMP_NEW;
1012         imp->imp_obd = class_incref(obd, "import", imp);
1013         cfs_sema_init(&imp->imp_sec_mutex, 1);
1014         cfs_waitq_init(&imp->imp_recovery_waitq);
1015
1016         cfs_atomic_set(&imp->imp_refcount, 2);
1017         cfs_atomic_set(&imp->imp_unregistering, 0);
1018         cfs_atomic_set(&imp->imp_inflight, 0);
1019         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1020         cfs_atomic_set(&imp->imp_inval_count, 0);
1021         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1022         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1023         class_handle_hash(&imp->imp_handle, import_handle_addref);
1024         init_imp_at(&imp->imp_at);
1025
1026         /* the default magic is V2, will be used in connect RPC, and
1027          * then adjusted according to the flags in request/reply. */
1028         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1029
1030         return imp;
1031 }
1032 EXPORT_SYMBOL(class_new_import);
1033
1034 void class_destroy_import(struct obd_import *import)
1035 {
1036         LASSERT(import != NULL);
1037         LASSERT(import != LP_POISON);
1038
1039         class_handle_unhash(&import->imp_handle);
1040
1041         cfs_spin_lock(&import->imp_lock);
1042         import->imp_generation++;
1043         cfs_spin_unlock(&import->imp_lock);
1044         class_import_put(import);
1045 }
1046 EXPORT_SYMBOL(class_destroy_import);
1047
1048 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1049
1050 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1051 {
1052         cfs_spin_lock(&exp->exp_locks_list_guard);
1053
1054         LASSERT(lock->l_exp_refs_nr >= 0);
1055
1056         if (lock->l_exp_refs_target != NULL &&
1057             lock->l_exp_refs_target != exp) {
1058                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1059                               exp, lock, lock->l_exp_refs_target);
1060         }
1061         if ((lock->l_exp_refs_nr ++) == 0) {
1062                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1063                 lock->l_exp_refs_target = exp;
1064         }
1065         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1066                lock, exp, lock->l_exp_refs_nr);
1067         cfs_spin_unlock(&exp->exp_locks_list_guard);
1068 }
1069 EXPORT_SYMBOL(__class_export_add_lock_ref);
1070
1071 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1072 {
1073         cfs_spin_lock(&exp->exp_locks_list_guard);
1074         LASSERT(lock->l_exp_refs_nr > 0);
1075         if (lock->l_exp_refs_target != exp) {
1076                 LCONSOLE_WARN("lock %p, "
1077                               "mismatching export pointers: %p, %p\n",
1078                               lock, lock->l_exp_refs_target, exp);
1079         }
1080         if (-- lock->l_exp_refs_nr == 0) {
1081                 cfs_list_del_init(&lock->l_exp_refs_link);
1082                 lock->l_exp_refs_target = NULL;
1083         }
1084         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1085                lock, exp, lock->l_exp_refs_nr);
1086         cfs_spin_unlock(&exp->exp_locks_list_guard);
1087 }
1088 EXPORT_SYMBOL(__class_export_del_lock_ref);
1089 #endif
1090
1091 /* A connection defines an export context in which preallocation can
1092    be managed. This releases the export pointer reference, and returns
1093    the export handle, so the export refcount is 1 when this function
1094    returns. */
1095 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1096                   struct obd_uuid *cluuid)
1097 {
1098         struct obd_export *export;
1099         LASSERT(conn != NULL);
1100         LASSERT(obd != NULL);
1101         LASSERT(cluuid != NULL);
1102         ENTRY;
1103
1104         export = class_new_export(obd, cluuid);
1105         if (IS_ERR(export))
1106                 RETURN(PTR_ERR(export));
1107
1108         conn->cookie = export->exp_handle.h_cookie;
1109         class_export_put(export);
1110
1111         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1112                cluuid->uuid, conn->cookie);
1113         RETURN(0);
1114 }
1115 EXPORT_SYMBOL(class_connect);
1116
1117 /* This function removes 1-3 references from the export:
1118  * 1 - for export pointer passed
1119  * and if disconnect really need
1120  * 2 - removing from hash
1121  * 3 - in client_unlink_export
1122  * The export pointer passed to this function can destroyed */
1123 int class_disconnect(struct obd_export *export)
1124 {
1125         int already_disconnected;
1126         ENTRY;
1127
1128         if (export == NULL) {
1129                 fixme();
1130                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1131                 RETURN(-EINVAL);
1132         }
1133
1134         cfs_spin_lock(&export->exp_lock);
1135         already_disconnected = export->exp_disconnected;
1136         export->exp_disconnected = 1;
1137         cfs_spin_unlock(&export->exp_lock);
1138
1139         /* class_cleanup(), abort_recovery(), and class_fail_export()
1140          * all end up in here, and if any of them race we shouldn't
1141          * call extra class_export_puts(). */
1142         if (already_disconnected) {
1143                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1144                 GOTO(no_disconn, already_disconnected);
1145         }
1146
1147         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1148                export->exp_handle.h_cookie);
1149
1150         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1151                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1152                              &export->exp_connection->c_peer.nid,
1153                              &export->exp_nid_hash);
1154
1155         class_unlink_export(export);
1156 no_disconn:
1157         class_export_put(export);
1158         RETURN(0);
1159 }
1160
1161 /* Return non-zero for a fully connected export */
1162 int class_connected_export(struct obd_export *exp)
1163 {
1164         if (exp) {
1165                 int connected;
1166                 cfs_spin_lock(&exp->exp_lock);
1167                 connected = (exp->exp_conn_cnt > 0);
1168                 cfs_spin_unlock(&exp->exp_lock);
1169                 return connected;
1170         }
1171         return 0;
1172 }
1173 EXPORT_SYMBOL(class_connected_export);
1174
1175 static void class_disconnect_export_list(cfs_list_t *list,
1176                                          enum obd_option flags)
1177 {
1178         int rc;
1179         struct obd_export *exp;
1180         ENTRY;
1181
1182         /* It's possible that an export may disconnect itself, but
1183          * nothing else will be added to this list. */
1184         while (!cfs_list_empty(list)) {
1185                 exp = cfs_list_entry(list->next, struct obd_export,
1186                                      exp_obd_chain);
1187                 /* need for safe call CDEBUG after obd_disconnect */
1188                 class_export_get(exp);
1189
1190                 cfs_spin_lock(&exp->exp_lock);
1191                 exp->exp_flags = flags;
1192                 cfs_spin_unlock(&exp->exp_lock);
1193
1194                 if (obd_uuid_equals(&exp->exp_client_uuid,
1195                                     &exp->exp_obd->obd_uuid)) {
1196                         CDEBUG(D_HA,
1197                                "exp %p export uuid == obd uuid, don't discon\n",
1198                                exp);
1199                         /* Need to delete this now so we don't end up pointing
1200                          * to work_list later when this export is cleaned up. */
1201                         cfs_list_del_init(&exp->exp_obd_chain);
1202                         class_export_put(exp);
1203                         continue;
1204                 }
1205
1206                 class_export_get(exp);
1207                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1208                        "last request at "CFS_TIME_T"\n",
1209                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1210                        exp, exp->exp_last_request_time);
1211                 /* release one export reference anyway */
1212                 rc = obd_disconnect(exp);
1213
1214                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1215                        obd_export_nid2str(exp), exp, rc);
1216                 class_export_put(exp);
1217         }
1218         EXIT;
1219 }
1220
1221 void class_disconnect_exports(struct obd_device *obd)
1222 {
1223         cfs_list_t work_list;
1224         ENTRY;
1225
1226         /* Move all of the exports from obd_exports to a work list, en masse. */
1227         CFS_INIT_LIST_HEAD(&work_list);
1228         cfs_spin_lock(&obd->obd_dev_lock);
1229         cfs_list_splice_init(&obd->obd_exports, &work_list);
1230         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1231         cfs_spin_unlock(&obd->obd_dev_lock);
1232
1233         if (!cfs_list_empty(&work_list)) {
1234                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1235                        "disconnecting them\n", obd->obd_minor, obd);
1236                 class_disconnect_export_list(&work_list,
1237                                              exp_flags_from_obd(obd));
1238         } else
1239                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1240                        obd->obd_minor, obd);
1241         EXIT;
1242 }
1243 EXPORT_SYMBOL(class_disconnect_exports);
1244
1245 /* Remove exports that have not completed recovery.
1246  */
1247 void class_disconnect_stale_exports(struct obd_device *obd,
1248                                     int (*test_export)(struct obd_export *))
1249 {
1250         cfs_list_t work_list;
1251         cfs_list_t *pos, *n;
1252         struct obd_export *exp;
1253         int evicted = 0;
1254         ENTRY;
1255
1256         CFS_INIT_LIST_HEAD(&work_list);
1257         cfs_spin_lock(&obd->obd_dev_lock);
1258         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1259                 int failed;
1260
1261                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1262
1263                 /* don't count self-export as client */
1264                 if (obd_uuid_equals(&exp->exp_client_uuid,
1265                                     &exp->exp_obd->obd_uuid))
1266                         continue;
1267
1268                 if (test_export(exp))
1269                         continue;
1270
1271                 cfs_spin_lock(&exp->exp_lock);
1272                 failed = exp->exp_failed;
1273                 exp->exp_failed = 1;
1274                 cfs_spin_unlock(&exp->exp_lock);
1275                 if (failed)
1276                         continue;
1277
1278                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1279                 evicted++;
1280                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1281                        obd->obd_name, exp->exp_client_uuid.uuid,
1282                        exp->exp_connection == NULL ? "<unknown>" :
1283                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1284                 print_export_data(exp, "EVICTING", 0);
1285         }
1286         cfs_spin_unlock(&obd->obd_dev_lock);
1287
1288         if (evicted) {
1289                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1290                        obd->obd_name, evicted);
1291                 obd->obd_stale_clients += evicted;
1292         }
1293         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1294                                                  OBD_OPT_ABORT_RECOV);
1295         EXIT;
1296 }
1297 EXPORT_SYMBOL(class_disconnect_stale_exports);
1298
1299 void class_fail_export(struct obd_export *exp)
1300 {
1301         int rc, already_failed;
1302
1303         cfs_spin_lock(&exp->exp_lock);
1304         already_failed = exp->exp_failed;
1305         exp->exp_failed = 1;
1306         cfs_spin_unlock(&exp->exp_lock);
1307
1308         if (already_failed) {
1309                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1310                        exp, exp->exp_client_uuid.uuid);
1311                 return;
1312         }
1313
1314         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1315                exp, exp->exp_client_uuid.uuid);
1316
1317         if (obd_dump_on_timeout)
1318                 libcfs_debug_dumplog();
1319
1320         /* need for safe call CDEBUG after obd_disconnect */
1321         class_export_get(exp);
1322
1323         /* Most callers into obd_disconnect are removing their own reference
1324          * (request, for example) in addition to the one from the hash table.
1325          * We don't have such a reference here, so make one. */
1326         class_export_get(exp);
1327         rc = obd_disconnect(exp);
1328         if (rc)
1329                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1330         else
1331                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1332                        exp, exp->exp_client_uuid.uuid);
1333         class_export_put(exp);
1334 }
1335 EXPORT_SYMBOL(class_fail_export);
1336
1337 char *obd_export_nid2str(struct obd_export *exp)
1338 {
1339         if (exp->exp_connection != NULL)
1340                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1341
1342         return "(no nid)";
1343 }
1344 EXPORT_SYMBOL(obd_export_nid2str);
1345
1346 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1347 {
1348         struct obd_export *doomed_exp = NULL;
1349         int exports_evicted = 0;
1350
1351         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1352
1353         do {
1354                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1355                 if (doomed_exp == NULL)
1356                         break;
1357
1358                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1359                          "nid %s found, wanted nid %s, requested nid %s\n",
1360                          obd_export_nid2str(doomed_exp),
1361                          libcfs_nid2str(nid_key), nid);
1362                 LASSERTF(doomed_exp != obd->obd_self_export,
1363                          "self-export is hashed by NID?\n");
1364                 exports_evicted++;
1365                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1366                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1367                        exports_evicted);
1368                 class_fail_export(doomed_exp);
1369                 class_export_put(doomed_exp);
1370         } while (1);
1371
1372         if (!exports_evicted)
1373                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1374                        obd->obd_name, nid);
1375         return exports_evicted;
1376 }
1377 EXPORT_SYMBOL(obd_export_evict_by_nid);
1378
1379 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1380 {
1381         struct obd_export *doomed_exp = NULL;
1382         struct obd_uuid doomed_uuid;
1383         int exports_evicted = 0;
1384
1385         obd_str2uuid(&doomed_uuid, uuid);
1386         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1387                 CERROR("%s: can't evict myself\n", obd->obd_name);
1388                 return exports_evicted;
1389         }
1390
1391         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1392
1393         if (doomed_exp == NULL) {
1394                 CERROR("%s: can't disconnect %s: no exports found\n",
1395                        obd->obd_name, uuid);
1396         } else {
1397                 CWARN("%s: evicting %s at adminstrative request\n",
1398                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1399                 class_fail_export(doomed_exp);
1400                 class_export_put(doomed_exp);
1401                 exports_evicted++;
1402         }
1403
1404         return exports_evicted;
1405 }
1406 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1407
1408 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1409 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1410 EXPORT_SYMBOL(class_export_dump_hook);
1411 #endif
1412
1413 static void print_export_data(struct obd_export *exp, const char *status,
1414                               int locks)
1415 {
1416         struct ptlrpc_reply_state *rs;
1417         struct ptlrpc_reply_state *first_reply = NULL;
1418         int nreplies = 0;
1419
1420         cfs_spin_lock(&exp->exp_lock);
1421         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1422                                 rs_exp_list) {
1423                 if (nreplies == 0)
1424                         first_reply = rs;
1425                 nreplies++;
1426         }
1427         cfs_spin_unlock(&exp->exp_lock);
1428
1429         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1430                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1431                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1432                cfs_atomic_read(&exp->exp_rpc_count),
1433                cfs_atomic_read(&exp->exp_cb_count),
1434                cfs_atomic_read(&exp->exp_locks_count),
1435                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1436                nreplies, first_reply, nreplies > 3 ? "..." : "",
1437                exp->exp_last_committed);
1438 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1439         if (locks && class_export_dump_hook != NULL)
1440                 class_export_dump_hook(exp);
1441 #endif
1442 }
1443
1444 void dump_exports(struct obd_device *obd, int locks)
1445 {
1446         struct obd_export *exp;
1447
1448         cfs_spin_lock(&obd->obd_dev_lock);
1449         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1450                 print_export_data(exp, "ACTIVE", locks);
1451         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1452                 print_export_data(exp, "UNLINKED", locks);
1453         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1454                 print_export_data(exp, "DELAYED", locks);
1455         cfs_spin_unlock(&obd->obd_dev_lock);
1456         cfs_spin_lock(&obd_zombie_impexp_lock);
1457         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1458                 print_export_data(exp, "ZOMBIE", locks);
1459         cfs_spin_unlock(&obd_zombie_impexp_lock);
1460 }
1461 EXPORT_SYMBOL(dump_exports);
1462
1463 void obd_exports_barrier(struct obd_device *obd)
1464 {
1465         int waited = 2;
1466         LASSERT(cfs_list_empty(&obd->obd_exports));
1467         cfs_spin_lock(&obd->obd_dev_lock);
1468         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1469                 cfs_spin_unlock(&obd->obd_dev_lock);
1470                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1471                                                    cfs_time_seconds(waited));
1472                 if (waited > 5 && IS_PO2(waited)) {
1473                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1474                                       "more than %d seconds. "
1475                                       "The obd refcount = %d. Is it stuck?\n",
1476                                       obd->obd_name, waited,
1477                                       cfs_atomic_read(&obd->obd_refcount));
1478                         dump_exports(obd, 1);
1479                 }
1480                 waited *= 2;
1481                 cfs_spin_lock(&obd->obd_dev_lock);
1482         }
1483         cfs_spin_unlock(&obd->obd_dev_lock);
1484 }
1485 EXPORT_SYMBOL(obd_exports_barrier);
1486
1487 /* Total amount of zombies to be destroyed */
1488 static int zombies_count = 0;
1489
1490 /**
1491  * kill zombie imports and exports
1492  */
1493 void obd_zombie_impexp_cull(void)
1494 {
1495         struct obd_import *import;
1496         struct obd_export *export;
1497         ENTRY;
1498
1499         do {
1500                 cfs_spin_lock(&obd_zombie_impexp_lock);
1501
1502                 import = NULL;
1503                 if (!cfs_list_empty(&obd_zombie_imports)) {
1504                         import = cfs_list_entry(obd_zombie_imports.next,
1505                                                 struct obd_import,
1506                                                 imp_zombie_chain);
1507                         cfs_list_del_init(&import->imp_zombie_chain);
1508                 }
1509
1510                 export = NULL;
1511                 if (!cfs_list_empty(&obd_zombie_exports)) {
1512                         export = cfs_list_entry(obd_zombie_exports.next,
1513                                                 struct obd_export,
1514                                                 exp_obd_chain);
1515                         cfs_list_del_init(&export->exp_obd_chain);
1516                 }
1517
1518                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1519
1520                 if (import != NULL) {
1521                         class_import_destroy(import);
1522                         cfs_spin_lock(&obd_zombie_impexp_lock);
1523                         zombies_count--;
1524                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1525                 }
1526
1527                 if (export != NULL) {
1528                         class_export_destroy(export);
1529                         cfs_spin_lock(&obd_zombie_impexp_lock);
1530                         zombies_count--;
1531                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1532                 }
1533
1534                 cfs_cond_resched();
1535         } while (import != NULL || export != NULL);
1536         EXIT;
1537 }
1538
1539 static cfs_completion_t         obd_zombie_start;
1540 static cfs_completion_t         obd_zombie_stop;
1541 static unsigned long            obd_zombie_flags;
1542 static cfs_waitq_t              obd_zombie_waitq;
1543 static pid_t                    obd_zombie_pid;
1544
1545 enum {
1546         OBD_ZOMBIE_STOP   = 1 << 1
1547 };
1548
1549 /**
1550  * check for work for kill zombie import/export thread.
1551  */
1552 static int obd_zombie_impexp_check(void *arg)
1553 {
1554         int rc;
1555
1556         cfs_spin_lock(&obd_zombie_impexp_lock);
1557         rc = (zombies_count == 0) &&
1558              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1559         cfs_spin_unlock(&obd_zombie_impexp_lock);
1560
1561         RETURN(rc);
1562 }
1563
1564 /**
1565  * Add export to the obd_zombe thread and notify it.
1566  */
1567 static void obd_zombie_export_add(struct obd_export *exp) {
1568         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1569         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1570         cfs_list_del_init(&exp->exp_obd_chain);
1571         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1572         cfs_spin_lock(&obd_zombie_impexp_lock);
1573         zombies_count++;
1574         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1575         cfs_spin_unlock(&obd_zombie_impexp_lock);
1576
1577         if (obd_zombie_impexp_notify != NULL)
1578                 obd_zombie_impexp_notify();
1579 }
1580
1581 /**
1582  * Add import to the obd_zombe thread and notify it.
1583  */
1584 static void obd_zombie_import_add(struct obd_import *imp) {
1585         LASSERT(imp->imp_sec == NULL);
1586         LASSERT(imp->imp_rq_pool == NULL);
1587         cfs_spin_lock(&obd_zombie_impexp_lock);
1588         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1589         zombies_count++;
1590         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1591         cfs_spin_unlock(&obd_zombie_impexp_lock);
1592
1593         if (obd_zombie_impexp_notify != NULL)
1594                 obd_zombie_impexp_notify();
1595 }
1596
1597 /**
1598  * notify import/export destroy thread about new zombie.
1599  */
1600 static void obd_zombie_impexp_notify(void)
1601 {
1602         /*
1603          * Make sure obd_zomebie_impexp_thread get this notification.
1604          * It is possible this signal only get by obd_zombie_barrier, and
1605          * barrier gulps this notification and sleeps away and hangs ensues
1606          */
1607         cfs_waitq_broadcast(&obd_zombie_waitq);
1608 }
1609
1610 /**
1611  * check whether obd_zombie is idle
1612  */
1613 static int obd_zombie_is_idle(void)
1614 {
1615         int rc;
1616
1617         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1618         cfs_spin_lock(&obd_zombie_impexp_lock);
1619         rc = (zombies_count == 0);
1620         cfs_spin_unlock(&obd_zombie_impexp_lock);
1621         return rc;
1622 }
1623
1624 /**
1625  * wait when obd_zombie import/export queues become empty
1626  */
1627 void obd_zombie_barrier(void)
1628 {
1629         struct l_wait_info lwi = { 0 };
1630
1631         if (obd_zombie_pid == cfs_curproc_pid())
1632                 /* don't wait for myself */
1633                 return;
1634         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1635 }
1636 EXPORT_SYMBOL(obd_zombie_barrier);
1637
1638 #ifdef __KERNEL__
1639
1640 /**
1641  * destroy zombie export/import thread.
1642  */
1643 static int obd_zombie_impexp_thread(void *unused)
1644 {
1645         int rc;
1646
1647         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1648                 cfs_complete(&obd_zombie_start);
1649                 RETURN(rc);
1650         }
1651
1652         cfs_complete(&obd_zombie_start);
1653
1654         obd_zombie_pid = cfs_curproc_pid();
1655
1656         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1657                 struct l_wait_info lwi = { 0 };
1658
1659                 l_wait_event(obd_zombie_waitq,
1660                              !obd_zombie_impexp_check(NULL), &lwi);
1661                 obd_zombie_impexp_cull();
1662
1663                 /*
1664                  * Notify obd_zombie_barrier callers that queues
1665                  * may be empty.
1666                  */
1667                 cfs_waitq_signal(&obd_zombie_waitq);
1668         }
1669
1670         cfs_complete(&obd_zombie_stop);
1671
1672         RETURN(0);
1673 }
1674
1675 #else /* ! KERNEL */
1676
1677 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1678 static void *obd_zombie_impexp_work_cb;
1679 static void *obd_zombie_impexp_idle_cb;
1680
1681 int obd_zombie_impexp_kill(void *arg)
1682 {
1683         int rc = 0;
1684
1685         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1686                 obd_zombie_impexp_cull();
1687                 rc = 1;
1688         }
1689         cfs_atomic_dec(&zombie_recur);
1690         return rc;
1691 }
1692
1693 #endif
1694
1695 /**
1696  * start destroy zombie import/export thread
1697  */
1698 int obd_zombie_impexp_init(void)
1699 {
1700         int rc;
1701
1702         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1703         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1704         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1705         cfs_init_completion(&obd_zombie_start);
1706         cfs_init_completion(&obd_zombie_stop);
1707         cfs_waitq_init(&obd_zombie_waitq);
1708         obd_zombie_pid = 0;
1709
1710 #ifdef __KERNEL__
1711         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1712         if (rc < 0)
1713                 RETURN(rc);
1714
1715         cfs_wait_for_completion(&obd_zombie_start);
1716 #else
1717
1718         obd_zombie_impexp_work_cb =
1719                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1720                                                  &obd_zombie_impexp_kill, NULL);
1721
1722         obd_zombie_impexp_idle_cb =
1723                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1724                                                  &obd_zombie_impexp_check, NULL);
1725         rc = 0;
1726 #endif
1727         RETURN(rc);
1728 }
1729 /**
1730  * stop destroy zombie import/export thread
1731  */
1732 void obd_zombie_impexp_stop(void)
1733 {
1734         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1735         obd_zombie_impexp_notify();
1736 #ifdef __KERNEL__
1737         cfs_wait_for_completion(&obd_zombie_stop);
1738 #else
1739         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1740         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1741 #endif
1742 }
1743
1744 /***** Kernel-userspace comm helpers *******/
1745
1746 /* Get length of entire message, including header */
1747 int kuc_len(int payload_len)
1748 {
1749         return sizeof(struct kuc_hdr) + payload_len;
1750 }
1751 EXPORT_SYMBOL(kuc_len);
1752
1753 /* Get a pointer to kuc header, given a ptr to the payload
1754  * @param p Pointer to payload area
1755  * @returns Pointer to kuc header
1756  */
1757 struct kuc_hdr * kuc_ptr(void *p)
1758 {
1759         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1760         LASSERT(lh->kuc_magic == KUC_MAGIC);
1761         return lh;
1762 }
1763 EXPORT_SYMBOL(kuc_ptr);
1764
1765 /* Test if payload is part of kuc message
1766  * @param p Pointer to payload area
1767  * @returns boolean
1768  */
1769 int kuc_ispayload(void *p)
1770 {
1771         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1772
1773         if (kh->kuc_magic == KUC_MAGIC)
1774                 return 1;
1775         else
1776                 return 0;
1777 }
1778 EXPORT_SYMBOL(kuc_ispayload);
1779
1780 /* Alloc space for a message, and fill in header
1781  * @return Pointer to payload area
1782  */
1783 void *kuc_alloc(int payload_len, int transport, int type)
1784 {
1785         struct kuc_hdr *lh;
1786         int len = kuc_len(payload_len);
1787
1788         OBD_ALLOC(lh, len);
1789         if (lh == NULL)
1790                 return ERR_PTR(-ENOMEM);
1791
1792         lh->kuc_magic = KUC_MAGIC;
1793         lh->kuc_transport = transport;
1794         lh->kuc_msgtype = type;
1795         lh->kuc_msglen = len;
1796
1797         return (void *)(lh + 1);
1798 }
1799 EXPORT_SYMBOL(kuc_alloc);
1800
1801 /* Takes pointer to payload area */
1802 inline void kuc_free(void *p, int payload_len)
1803 {
1804         struct kuc_hdr *lh = kuc_ptr(p);
1805         OBD_FREE(lh, kuc_len(payload_len));
1806 }
1807 EXPORT_SYMBOL(kuc_free);
1808
1809
1810