Whamcloud - gitweb
*** empty log message ***
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49 #include <class_hash.h>
50
51 extern struct list_head obd_types;
52 spinlock_t obd_types_lock;
53
54 cfs_mem_cache_t *obd_device_cachep;
55 cfs_mem_cache_t *obdo_cachep;
56 EXPORT_SYMBOL(obdo_cachep);
57 cfs_mem_cache_t *import_cachep;
58
59 struct list_head  obd_zombie_imports;
60 struct list_head  obd_zombie_exports;
61 spinlock_t        obd_zombie_impexp_lock;
62 static void obd_zombie_impexp_notify(void);
63 static void obd_zombie_export_add(struct obd_export *exp);
64 static void obd_zombie_import_add(struct obd_import *imp);
65 static void print_export_data(struct obd_export *exp,
66                               const char *status, int locks);
67
68 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84 EXPORT_SYMBOL(obd_device_alloc);
85
86 static void obd_device_free(struct obd_device *obd)
87 {
88         LASSERT(obd != NULL);
89         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
90                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
91         if (obd->obd_namespace != NULL) {
92                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
93                        obd, obd->obd_namespace, obd->obd_force);
94                 LBUG();
95         }
96         lu_ref_fini(&obd->obd_reference);
97         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 }
99
100 struct obd_type *class_search_type(const char *name)
101 {
102         struct list_head *tmp;
103         struct obd_type *type;
104
105         spin_lock(&obd_types_lock);
106         list_for_each(tmp, &obd_types) {
107                 type = list_entry(tmp, struct obd_type, typ_chain);
108                 if (strcmp(type->typ_name, name) == 0) {
109                         spin_unlock(&obd_types_lock);
110                         return type;
111                 }
112         }
113         spin_unlock(&obd_types_lock);
114         return NULL;
115 }
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef CONFIG_KMOD
122         if (!type) {
123                 const char *modname = name;
124                 if (!request_module("%s", modname)) {
125                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126                         type = class_search_type(name);
127                 } else {
128                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
129                                            modname);
130                 }
131         }
132 #endif
133         if (type) {
134                 spin_lock(&type->obd_type_lock);
135                 type->typ_refcnt++;
136                 try_module_get(type->typ_dt_ops->o_owner);
137                 spin_unlock(&type->obd_type_lock);
138         }
139         return type;
140 }
141
142 void class_put_type(struct obd_type *type)
143 {
144         LASSERT(type);
145         spin_lock(&type->obd_type_lock);
146         type->typ_refcnt--;
147         module_put(type->typ_dt_ops->o_owner);
148         spin_unlock(&type->obd_type_lock);
149 }
150
151 #define CLASS_MAX_NAME 1024
152
153 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
154                         struct lprocfs_vars *vars, const char *name,
155                         struct lu_device_type *ldt)
156 {
157         struct obd_type *type;
158         int rc = 0;
159         ENTRY;
160
161         /* sanity check */
162         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
163
164         if (class_search_type(name)) {
165                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166                 RETURN(-EEXIST);
167         }
168
169         rc = -ENOMEM;
170         OBD_ALLOC(type, sizeof(*type));
171         if (type == NULL)
172                 RETURN(rc);
173
174         OBD_ALLOC_PTR(type->typ_dt_ops);
175         OBD_ALLOC_PTR(type->typ_md_ops);
176         OBD_ALLOC(type->typ_name, strlen(name) + 1);
177
178         if (type->typ_dt_ops == NULL ||
179             type->typ_md_ops == NULL ||
180             type->typ_name == NULL)
181                 GOTO (failed, rc);
182
183         *(type->typ_dt_ops) = *dt_ops;
184         /* md_ops is optional */
185         if (md_ops)
186                 *(type->typ_md_ops) = *md_ops;
187         strcpy(type->typ_name, name);
188         spin_lock_init(&type->obd_type_lock);
189
190 #ifdef LPROCFS
191         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
192                                               vars, type);
193         if (IS_ERR(type->typ_procroot)) {
194                 rc = PTR_ERR(type->typ_procroot);
195                 type->typ_procroot = NULL;
196                 GOTO (failed, rc);
197         }
198 #endif
199         if (ldt != NULL) {
200                 type->typ_lu = ldt;
201                 rc = lu_device_type_init(ldt);
202                 if (rc != 0)
203                         GOTO (failed, rc);
204         }
205
206         spin_lock(&obd_types_lock);
207         list_add(&type->typ_chain, &obd_types);
208         spin_unlock(&obd_types_lock);
209
210         RETURN (0);
211
212  failed:
213         if (type->typ_name != NULL)
214                 OBD_FREE(type->typ_name, strlen(name) + 1);
215         if (type->typ_md_ops != NULL)
216                 OBD_FREE_PTR(type->typ_md_ops);
217         if (type->typ_dt_ops != NULL)
218                 OBD_FREE_PTR(type->typ_dt_ops);
219         OBD_FREE(type, sizeof(*type));
220         RETURN(rc);
221 }
222
223 int class_unregister_type(const char *name)
224 {
225         struct obd_type *type = class_search_type(name);
226         ENTRY;
227
228         if (!type) {
229                 CERROR("unknown obd type\n");
230                 RETURN(-EINVAL);
231         }
232
233         if (type->typ_refcnt) {
234                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
235                 /* This is a bad situation, let's make the best of it */
236                 /* Remove ops, but leave the name for debugging */
237                 OBD_FREE_PTR(type->typ_dt_ops);
238                 OBD_FREE_PTR(type->typ_md_ops);
239                 RETURN(-EBUSY);
240         }
241
242         if (type->typ_procroot) {
243                 lprocfs_remove(&type->typ_procroot);
244         }
245
246         if (type->typ_lu)
247                 lu_device_type_fini(type->typ_lu);
248
249         spin_lock(&obd_types_lock);
250         list_del(&type->typ_chain);
251         spin_unlock(&obd_types_lock);
252         OBD_FREE(type->typ_name, strlen(name) + 1);
253         if (type->typ_dt_ops != NULL)
254                 OBD_FREE_PTR(type->typ_dt_ops);
255         if (type->typ_md_ops != NULL)
256                 OBD_FREE_PTR(type->typ_md_ops);
257         OBD_FREE(type, sizeof(*type));
258         RETURN(0);
259 } /* class_unregister_type */
260
261 /**
262  * Create a new obd device.
263  *
264  * Find an empty slot in ::obd_devs[], create a new obd device in it.
265  *
266  * \param[in] type_name obd device type string.
267  * \param[in] name      obd device name.
268  *
269  * \retval NULL if create fails, otherwise return the obd device
270  *         pointer created.
271  */
272 struct obd_device *class_newdev(const char *type_name, const char *name)
273 {
274         struct obd_device *result = NULL;
275         struct obd_device *newdev;
276         struct obd_type *type = NULL;
277         int i;
278         int new_obd_minor = 0;
279
280         if (strlen(name) >= MAX_OBD_NAME) {
281                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
282                 RETURN(ERR_PTR(-EINVAL));
283         }
284
285         type = class_get_type(type_name);
286         if (type == NULL){
287                 CERROR("OBD: unknown type: %s\n", type_name);
288                 RETURN(ERR_PTR(-ENODEV));
289         }
290
291         newdev = obd_device_alloc();
292         if (newdev == NULL) {
293                 class_put_type(type);
294                 RETURN(ERR_PTR(-ENOMEM));
295         }
296         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
297
298         spin_lock(&obd_dev_lock);
299         for (i = 0; i < class_devno_max(); i++) {
300                 struct obd_device *obd = class_num2obd(i);
301                 if (obd && obd->obd_name &&
302                     (strcmp(name, obd->obd_name) == 0)) {
303                         CERROR("Device %s already exists, won't add\n", name);
304                         if (result) {
305                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
306                                          "%p obd_magic %08x != %08x\n", result,
307                                          result->obd_magic, OBD_DEVICE_MAGIC);
308                                 LASSERTF(result->obd_minor == new_obd_minor,
309                                          "%p obd_minor %d != %d\n", result,
310                                          result->obd_minor, new_obd_minor);
311
312                                 obd_devs[result->obd_minor] = NULL;
313                                 result->obd_name[0]='\0';
314                          }
315                         result = ERR_PTR(-EEXIST);
316                         break;
317                 }
318                 if (!result && !obd) {
319                         result = newdev;
320                         result->obd_minor = i;
321                         new_obd_minor = i;
322                         result->obd_type = type;
323                         strncpy(result->obd_name, name,
324                                 sizeof(result->obd_name) - 1);
325                         obd_devs[i] = result;
326                 }
327         }
328         spin_unlock(&obd_dev_lock);
329
330         if (result == NULL && i >= class_devno_max()) {
331                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
332                        class_devno_max());
333                 result = ERR_PTR(-EOVERFLOW);
334         }
335
336         if (IS_ERR(result)) {
337                 obd_device_free(newdev);
338                 class_put_type(type);
339         } else {
340                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
341                        result->obd_name, result);
342         }
343         return result;
344 }
345
346 void class_release_dev(struct obd_device *obd)
347 {
348         struct obd_type *obd_type = obd->obd_type;
349
350         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
351                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
352         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
353                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
354         LASSERT(obd_type != NULL);
355
356         CDEBUG(D_INFO, "Release obd device %s obd_type name =%s\n",
357                obd->obd_name,obd->obd_type->typ_name);
358
359         spin_lock(&obd_dev_lock);
360         obd_devs[obd->obd_minor] = NULL;
361         spin_unlock(&obd_dev_lock);
362         obd_device_free(obd);
363
364         class_put_type(obd_type);
365 }
366
367 int class_name2dev(const char *name)
368 {
369         int i;
370
371         if (!name)
372                 return -1;
373
374         spin_lock(&obd_dev_lock);
375         for (i = 0; i < class_devno_max(); i++) {
376                 struct obd_device *obd = class_num2obd(i);
377                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
378                         /* Make sure we finished attaching before we give
379                            out any references */
380                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
381                         if (obd->obd_attached) {
382                                 spin_unlock(&obd_dev_lock);
383                                 return i;
384                         }
385                         break;
386                 }
387         }
388         spin_unlock(&obd_dev_lock);
389
390         return -1;
391 }
392
393 struct obd_device *class_name2obd(const char *name)
394 {
395         int dev = class_name2dev(name);
396
397         if (dev < 0 || dev > class_devno_max())
398                 return NULL;
399         return class_num2obd(dev);
400 }
401
402 int class_uuid2dev(struct obd_uuid *uuid)
403 {
404         int i;
405
406         spin_lock(&obd_dev_lock);
407         for (i = 0; i < class_devno_max(); i++) {
408                 struct obd_device *obd = class_num2obd(i);
409                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
410                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
411                         spin_unlock(&obd_dev_lock);
412                         return i;
413                 }
414         }
415         spin_unlock(&obd_dev_lock);
416
417         return -1;
418 }
419
420 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
421 {
422         int dev = class_uuid2dev(uuid);
423         if (dev < 0)
424                 return NULL;
425         return class_num2obd(dev);
426 }
427
428 /**
429  * Get obd device from ::obd_devs[]
430  *
431  * \param num [in] array index
432  *
433  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
434  *         otherwise return the obd device there.
435  */
436 struct obd_device *class_num2obd(int num)
437 {
438         struct obd_device *obd = NULL;
439
440         if (num < class_devno_max()) {
441                 obd = obd_devs[num];
442                 if (obd == NULL)
443                         return NULL;
444
445                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
446                          "%p obd_magic %08x != %08x\n",
447                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
448                 LASSERTF(obd->obd_minor == num,
449                          "%p obd_minor %0d != %0d\n",
450                          obd, obd->obd_minor, num);
451         }
452
453         return obd;
454 }
455
456 void class_obd_list(void)
457 {
458         char *status;
459         int i;
460
461         spin_lock(&obd_dev_lock);
462         for (i = 0; i < class_devno_max(); i++) {
463                 struct obd_device *obd = class_num2obd(i);
464                 if (obd == NULL)
465                         continue;
466                 if (obd->obd_stopping)
467                         status = "ST";
468                 else if (obd->obd_set_up)
469                         status = "UP";
470                 else if (obd->obd_attached)
471                         status = "AT";
472                 else
473                         status = "--";
474                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
475                          i, status, obd->obd_type->typ_name,
476                          obd->obd_name, obd->obd_uuid.uuid,
477                          atomic_read(&obd->obd_refcount));
478         }
479         spin_unlock(&obd_dev_lock);
480         return;
481 }
482
483 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
484    specified, then only the client with that uuid is returned,
485    otherwise any client connected to the tgt is returned. */
486 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
487                                           const char * typ_name,
488                                           struct obd_uuid *grp_uuid)
489 {
490         int i;
491
492         spin_lock(&obd_dev_lock);
493         for (i = 0; i < class_devno_max(); i++) {
494                 struct obd_device *obd = class_num2obd(i);
495                 if (obd == NULL)
496                         continue;
497                 if ((strncmp(obd->obd_type->typ_name, typ_name,
498                              strlen(typ_name)) == 0)) {
499                         if (obd_uuid_equals(tgt_uuid,
500                                             &obd->u.cli.cl_target_uuid) &&
501                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
502                                                          &obd->obd_uuid) : 1)) {
503                                 spin_unlock(&obd_dev_lock);
504                                 return obd;
505                         }
506                 }
507         }
508         spin_unlock(&obd_dev_lock);
509
510         return NULL;
511 }
512
513 /* Iterate the obd_device list looking devices have grp_uuid. Start
514    searching at *next, and if a device is found, the next index to look
515    at is saved in *next. If next is NULL, then the first matching device
516    will always be returned. */
517 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
518 {
519         int i;
520
521         if (next == NULL)
522                 i = 0;
523         else if (*next >= 0 && *next < class_devno_max())
524                 i = *next;
525         else
526                 return NULL;
527
528         spin_lock(&obd_dev_lock);
529         for (; i < class_devno_max(); i++) {
530                 struct obd_device *obd = class_num2obd(i);
531                 if (obd == NULL)
532                         continue;
533                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
534                         if (next != NULL)
535                                 *next = i+1;
536                         spin_unlock(&obd_dev_lock);
537                         return obd;
538                 }
539         }
540         spin_unlock(&obd_dev_lock);
541
542         return NULL;
543 }
544
545 /**
546  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
547  * adjust sptlrpc settings accordingly.
548  */
549 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
550 {
551         struct obd_device  *obd;
552         const char         *type;
553         int                 i, rc = 0, rc2;
554
555         LASSERT(namelen > 0);
556
557         spin_lock(&obd_dev_lock);
558         for (i = 0; i < class_devno_max(); i++) {
559                 obd = class_num2obd(i);
560
561                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
562                         continue;
563
564                 /* only notify mdc, osc, mdt, ost */
565                 type = obd->obd_type->typ_name;
566                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
567                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
568                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
569                     strcmp(type, LUSTRE_OST_NAME) != 0)
570                         continue;
571
572                 if (strncmp(obd->obd_name, fsname, namelen))
573                         continue;
574
575                 class_incref(obd, __FUNCTION__, obd);
576                 spin_unlock(&obd_dev_lock);
577                 rc2 = obd_set_info_async(obd->obd_self_export,
578                                          sizeof(KEY_SPTLRPC_CONF),
579                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
580                 rc = rc ? rc : rc2;
581                 class_decref(obd, __FUNCTION__, obd);
582                 spin_lock(&obd_dev_lock);
583         }
584         spin_unlock(&obd_dev_lock);
585         return rc;
586 }
587 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
588
589 void obd_cleanup_caches(void)
590 {
591         int rc;
592
593         ENTRY;
594         if (obd_device_cachep) {
595                 rc = cfs_mem_cache_destroy(obd_device_cachep);
596                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
597                 obd_device_cachep = NULL;
598         }
599         if (obdo_cachep) {
600                 rc = cfs_mem_cache_destroy(obdo_cachep);
601                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
602                 obdo_cachep = NULL;
603         }
604         if (import_cachep) {
605                 rc = cfs_mem_cache_destroy(import_cachep);
606                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
607                 import_cachep = NULL;
608         }
609         if (capa_cachep) {
610                 rc = cfs_mem_cache_destroy(capa_cachep);
611                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
612                 capa_cachep = NULL;
613         }
614         EXIT;
615 }
616
617 int obd_init_caches(void)
618 {
619         ENTRY;
620
621         LASSERT(obd_device_cachep == NULL);
622         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
623                                                  sizeof(struct obd_device),
624                                                  0, 0);
625         if (!obd_device_cachep)
626                 GOTO(out, -ENOMEM);
627
628         LASSERT(obdo_cachep == NULL);
629         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
630                                            0, 0);
631         if (!obdo_cachep)
632                 GOTO(out, -ENOMEM);
633
634         LASSERT(import_cachep == NULL);
635         import_cachep = cfs_mem_cache_create("ll_import_cache",
636                                              sizeof(struct obd_import),
637                                              0, 0);
638         if (!import_cachep)
639                 GOTO(out, -ENOMEM);
640
641         LASSERT(capa_cachep == NULL);
642         capa_cachep = cfs_mem_cache_create("capa_cache",
643                                            sizeof(struct obd_capa), 0, 0);
644         if (!capa_cachep)
645                 GOTO(out, -ENOMEM);
646
647         RETURN(0);
648  out:
649         obd_cleanup_caches();
650         RETURN(-ENOMEM);
651
652 }
653
654 /* map connection to client */
655 struct obd_export *class_conn2export(struct lustre_handle *conn)
656 {
657         struct obd_export *export;
658         ENTRY;
659
660         if (!conn) {
661                 CDEBUG(D_CACHE, "looking for null handle\n");
662                 RETURN(NULL);
663         }
664
665         if (conn->cookie == -1) {  /* this means assign a new connection */
666                 CDEBUG(D_CACHE, "want a new connection\n");
667                 RETURN(NULL);
668         }
669
670         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
671         export = class_handle2object(conn->cookie);
672         RETURN(export);
673 }
674
675 struct obd_device *class_exp2obd(struct obd_export *exp)
676 {
677         if (exp)
678                 return exp->exp_obd;
679         return NULL;
680 }
681
682 struct obd_device *class_conn2obd(struct lustre_handle *conn)
683 {
684         struct obd_export *export;
685         export = class_conn2export(conn);
686         if (export) {
687                 struct obd_device *obd = export->exp_obd;
688                 class_export_put(export);
689                 return obd;
690         }
691         return NULL;
692 }
693
694 struct obd_import *class_exp2cliimp(struct obd_export *exp)
695 {
696         struct obd_device *obd = exp->exp_obd;
697         if (obd == NULL)
698                 return NULL;
699         return obd->u.cli.cl_import;
700 }
701
702 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
703 {
704         struct obd_device *obd = class_conn2obd(conn);
705         if (obd == NULL)
706                 return NULL;
707         return obd->u.cli.cl_import;
708 }
709
710 /* Export management functions */
711 static void class_export_destroy(struct obd_export *exp)
712 {
713         struct obd_device *obd = exp->exp_obd;
714         ENTRY;
715
716         LASSERT (atomic_read(&exp->exp_refcount) == 0);
717
718         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
719                exp->exp_client_uuid.uuid, obd->obd_name);
720
721         LASSERT(obd != NULL);
722
723         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
724         if (exp->exp_connection)
725                 ptlrpc_put_connection_superhack(exp->exp_connection);
726
727         LASSERT(list_empty(&exp->exp_outstanding_replies));
728         LASSERT(list_empty(&exp->exp_uncommitted_replies));
729         LASSERT(list_empty(&exp->exp_req_replay_queue));
730         LASSERT(list_empty(&exp->exp_queued_rpc));
731         obd_destroy_export(exp);
732         class_decref(obd, "export", exp);
733
734         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
735         EXIT;
736 }
737
738 static void export_handle_addref(void *export)
739 {
740         class_export_get(export);
741 }
742
743 struct obd_export *class_export_get(struct obd_export *exp)
744 {
745         atomic_inc(&exp->exp_refcount);
746         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
747                atomic_read(&exp->exp_refcount));
748         return exp;
749 }
750 EXPORT_SYMBOL(class_export_get);
751
752 void class_export_put(struct obd_export *exp)
753 {
754         LASSERT(exp != NULL);
755         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
756                atomic_read(&exp->exp_refcount) - 1);
757         LASSERT(atomic_read(&exp->exp_refcount) > 0);
758         LASSERT(atomic_read(&exp->exp_refcount) < 0x5a5a5a);
759
760         if (atomic_dec_and_test(&exp->exp_refcount)) {
761                 LASSERT(!list_empty(&exp->exp_obd_chain));
762                 CDEBUG(D_IOCTL, "final put %p/%s\n",
763                        exp, exp->exp_client_uuid.uuid);
764                 obd_zombie_export_add(exp);
765         }
766 }
767 EXPORT_SYMBOL(class_export_put);
768
769 /* Creates a new export, adds it to the hash table, and returns a
770  * pointer to it. The refcount is 2: one for the hash reference, and
771  * one for the pointer returned by this function. */
772 struct obd_export *class_new_export(struct obd_device *obd,
773                                     struct obd_uuid *cluuid)
774 {
775         struct obd_export *export;
776         int rc = 0;
777         ENTRY;
778
779         OBD_ALLOC_PTR(export);
780         if (!export)
781                 return ERR_PTR(-ENOMEM);
782
783         export->exp_conn_cnt = 0;
784         export->exp_lock_hash = NULL;
785         atomic_set(&export->exp_refcount, 2);
786         atomic_set(&export->exp_rpc_count, 0);
787         atomic_set(&export->exp_cb_count, 0);
788         atomic_set(&export->exp_locks_count, 0);
789 #if LUSTRE_TRACKS_LOCK_EXP_REFS
790         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
791         spin_lock_init(&export->exp_locks_list_guard);
792 #endif
793         atomic_set(&export->exp_replay_count, 0);
794         export->exp_obd = obd;
795         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
796         spin_lock_init(&export->exp_uncommitted_replies_lock);
797         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
798         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
799         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
800         CFS_INIT_LIST_HEAD(&export->exp_queued_rpc);
801         class_handle_hash(&export->exp_handle, export_handle_addref);
802         export->exp_last_request_time = cfs_time_current_sec();
803         spin_lock_init(&export->exp_lock);
804         INIT_HLIST_NODE(&export->exp_uuid_hash);
805         INIT_HLIST_NODE(&export->exp_nid_hash);
806
807         export->exp_sp_peer = LUSTRE_SP_ANY;
808         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
809         export->exp_client_uuid = *cluuid;
810         obd_init_export(export);
811
812         spin_lock(&obd->obd_dev_lock);
813          /* shouldn't happen, but might race */
814         if (obd->obd_stopping)
815                 GOTO(exit_err, rc = -ENODEV);
816
817         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
818                 rc = lustre_hash_add_unique(obd->obd_uuid_hash, cluuid,
819                                             &export->exp_uuid_hash);
820                 if (rc != 0) {
821                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
822                                       obd->obd_name, cluuid->uuid, rc);
823                         GOTO(exit_err, rc = -EALREADY);
824                 }
825         }
826
827         class_incref(obd, "export", export);
828         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
829         list_add_tail(&export->exp_obd_chain_timed,
830                       &export->exp_obd->obd_exports_timed);
831         export->exp_obd->obd_num_exports++;
832         spin_unlock(&obd->obd_dev_lock);
833         RETURN(export);
834
835 exit_err:
836         spin_unlock(&obd->obd_dev_lock);
837         class_handle_unhash(&export->exp_handle);
838         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
839         obd_destroy_export(export);
840         OBD_FREE_PTR(export);
841         return ERR_PTR(rc);
842 }
843 EXPORT_SYMBOL(class_new_export);
844
845 void class_unlink_export(struct obd_export *exp)
846 {
847         class_handle_unhash(&exp->exp_handle);
848
849         spin_lock(&exp->exp_obd->obd_dev_lock);
850         /* delete an uuid-export hashitem from hashtables */
851         if (!hlist_unhashed(&exp->exp_uuid_hash))
852                 lustre_hash_del(exp->exp_obd->obd_uuid_hash,
853                                 &exp->exp_client_uuid,
854                                 &exp->exp_uuid_hash);
855
856         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
857         list_del_init(&exp->exp_obd_chain_timed);
858         exp->exp_obd->obd_num_exports--;
859         spin_unlock(&exp->exp_obd->obd_dev_lock);
860         class_export_put(exp);
861 }
862 EXPORT_SYMBOL(class_unlink_export);
863
864 /* Import management functions */
865 void class_import_destroy(struct obd_import *imp)
866 {
867         ENTRY;
868
869         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
870                 imp->imp_obd->obd_name);
871
872         LASSERT(atomic_read(&imp->imp_refcount) == 0);
873
874         ptlrpc_put_connection_superhack(imp->imp_connection);
875
876         while (!list_empty(&imp->imp_conn_list)) {
877                 struct obd_import_conn *imp_conn;
878
879                 imp_conn = list_entry(imp->imp_conn_list.next,
880                                       struct obd_import_conn, oic_item);
881                 list_del_init(&imp_conn->oic_item);
882                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
883                 OBD_FREE(imp_conn, sizeof(*imp_conn));
884         }
885
886         LASSERT(imp->imp_sec == NULL);
887         class_decref(imp->imp_obd, "import", imp);
888         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
889         EXIT;
890 }
891
892 static void import_handle_addref(void *import)
893 {
894         class_import_get(import);
895 }
896
897 struct obd_import *class_import_get(struct obd_import *import)
898 {
899         LASSERT(atomic_read(&import->imp_refcount) >= 0);
900         LASSERT(atomic_read(&import->imp_refcount) < 0x5a5a5a);
901         atomic_inc(&import->imp_refcount);
902         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
903                atomic_read(&import->imp_refcount),
904                import->imp_obd->obd_name);
905         return import;
906 }
907 EXPORT_SYMBOL(class_import_get);
908
909 void class_import_put(struct obd_import *imp)
910 {
911         ENTRY;
912
913         LASSERT(atomic_read(&imp->imp_refcount) > 0);
914         LASSERT(atomic_read(&imp->imp_refcount) < 0x5a5a5a);
915         LASSERT(list_empty(&imp->imp_zombie_chain));
916
917         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
918                atomic_read(&imp->imp_refcount) - 1,
919                imp->imp_obd->obd_name);
920
921         if (atomic_dec_and_test(&imp->imp_refcount)) {
922                 CDEBUG(D_INFO, "final put import %p\n", imp);
923                 obd_zombie_import_add(imp);
924         }
925
926         EXIT;
927 }
928 EXPORT_SYMBOL(class_import_put);
929
930 static void init_imp_at(struct imp_at *at) {
931         int i;
932         at_init(&at->iat_net_latency, 0, 0);
933         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
934                 /* max service estimates are tracked on the server side, so
935                    don't use the AT history here, just use the last reported
936                    val. (But keep hist for proc histogram, worst_ever) */
937                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
938                         AT_FLG_NOHIST);
939         }
940 }
941
942 struct obd_import *class_new_import(struct obd_device *obd)
943 {
944         struct obd_import *imp;
945
946         OBD_ALLOC(imp, sizeof(*imp));
947         if (imp == NULL)
948                 return NULL;
949
950         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
951         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
952         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
953         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
954         spin_lock_init(&imp->imp_lock);
955         imp->imp_last_success_conn = 0;
956         imp->imp_state = LUSTRE_IMP_NEW;
957         imp->imp_obd = class_incref(obd, "import", imp);
958         sema_init(&imp->imp_sec_mutex, 1);
959         cfs_waitq_init(&imp->imp_recovery_waitq);
960
961         atomic_set(&imp->imp_refcount, 2);
962         atomic_set(&imp->imp_unregistering, 0);
963         atomic_set(&imp->imp_inflight, 0);
964         atomic_set(&imp->imp_replay_inflight, 0);
965         atomic_set(&imp->imp_inval_count, 0);
966         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
967         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
968         class_handle_hash(&imp->imp_handle, import_handle_addref);
969         init_imp_at(&imp->imp_at);
970
971         /* the default magic is V2, will be used in connect RPC, and
972          * then adjusted according to the flags in request/reply. */
973         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
974
975         return imp;
976 }
977 EXPORT_SYMBOL(class_new_import);
978
979 void class_destroy_import(struct obd_import *import)
980 {
981         LASSERT(import != NULL);
982         LASSERT(import != LP_POISON);
983
984         class_handle_unhash(&import->imp_handle);
985
986         spin_lock(&import->imp_lock);
987         import->imp_generation++;
988         spin_unlock(&import->imp_lock);
989         class_import_put(import);
990 }
991 EXPORT_SYMBOL(class_destroy_import);
992
993 #if LUSTRE_TRACKS_LOCK_EXP_REFS
994
995 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
996 {
997         spin_lock(&exp->exp_locks_list_guard);
998
999         LASSERT(lock->l_exp_refs_nr >= 0);
1000
1001         if (lock->l_exp_refs_target != NULL &&
1002             lock->l_exp_refs_target != exp) {
1003                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1004                               exp, lock, lock->l_exp_refs_target);
1005         }
1006         if ((lock->l_exp_refs_nr ++) == 0) {
1007                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1008                 lock->l_exp_refs_target = exp;
1009         }
1010         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1011                lock, exp, lock->l_exp_refs_nr);
1012         spin_unlock(&exp->exp_locks_list_guard);
1013 }
1014 EXPORT_SYMBOL(__class_export_add_lock_ref);
1015
1016 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1017 {
1018         spin_lock(&exp->exp_locks_list_guard);
1019         LASSERT(lock->l_exp_refs_nr > 0);
1020         if (lock->l_exp_refs_target != exp) {
1021                 LCONSOLE_WARN("lock %p, "
1022                               "mismatching export pointers: %p, %p\n",
1023                               lock, lock->l_exp_refs_target, exp);
1024         }
1025         if (-- lock->l_exp_refs_nr == 0) {
1026                 list_del_init(&lock->l_exp_refs_link);
1027                 lock->l_exp_refs_target = NULL;
1028         }
1029         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1030                lock, exp, lock->l_exp_refs_nr);
1031         spin_unlock(&exp->exp_locks_list_guard);
1032 }
1033 EXPORT_SYMBOL(__class_export_del_lock_ref);
1034 #endif
1035
1036 /* A connection defines an export context in which preallocation can
1037    be managed. This releases the export pointer reference, and returns
1038    the export handle, so the export refcount is 1 when this function
1039    returns. */
1040 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1041                   struct obd_uuid *cluuid)
1042 {
1043         struct obd_export *export;
1044         LASSERT(conn != NULL);
1045         LASSERT(obd != NULL);
1046         LASSERT(cluuid != NULL);
1047         ENTRY;
1048
1049         export = class_new_export(obd, cluuid);
1050         if (IS_ERR(export))
1051                 RETURN(PTR_ERR(export));
1052
1053         conn->cookie = export->exp_handle.h_cookie;
1054         class_export_put(export);
1055
1056         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1057                cluuid->uuid, conn->cookie);
1058         RETURN(0);
1059 }
1060 EXPORT_SYMBOL(class_connect);
1061
1062 /* if export is involved in recovery then clean up related things */
1063 void class_export_recovery_cleanup(struct obd_export *exp)
1064 {
1065         struct obd_device *obd = exp->exp_obd;
1066
1067         spin_lock_bh(&obd->obd_processing_task_lock);
1068         if (exp->exp_delayed)
1069                 obd->obd_delayed_clients--;
1070         if (obd->obd_recovering && exp->exp_in_recovery) {
1071                 spin_lock(&exp->exp_lock);
1072                 exp->exp_in_recovery = 0;
1073                 spin_unlock(&exp->exp_lock);
1074                 LASSERT(obd->obd_connected_clients);
1075                 obd->obd_connected_clients--;
1076         }
1077         /** Cleanup req replay fields */
1078         if (exp->exp_req_replay_needed) {
1079                 spin_lock(&exp->exp_lock);
1080                 exp->exp_req_replay_needed = 0;
1081                 spin_unlock(&exp->exp_lock);
1082                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1083                 atomic_dec(&obd->obd_req_replay_clients);
1084         }
1085         /** Cleanup lock replay data */
1086         if (exp->exp_lock_replay_needed) {
1087                 spin_lock(&exp->exp_lock);
1088                 exp->exp_lock_replay_needed = 0;
1089                 spin_unlock(&exp->exp_lock);
1090                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1091                 atomic_dec(&obd->obd_lock_replay_clients);
1092         }
1093         spin_unlock_bh(&obd->obd_processing_task_lock);
1094 }
1095
1096 /* This function removes 1-3 references from the export:
1097  * 1 - for export pointer passed
1098  * and if disconnect really need
1099  * 2 - removing from hash
1100  * 3 - in client_unlink_export
1101  * The export pointer passed to this function can destroyed */
1102 int class_disconnect(struct obd_export *export)
1103 {
1104         int already_disconnected;
1105         ENTRY;
1106
1107         if (export == NULL) {
1108                 fixme();
1109                 CDEBUG(D_IOCTL, "attempting to free NULL export %p\n", export);
1110                 RETURN(-EINVAL);
1111         }
1112
1113         spin_lock(&export->exp_lock);
1114         already_disconnected = export->exp_disconnected;
1115         export->exp_disconnected = 1;
1116         spin_unlock(&export->exp_lock);
1117
1118         /* class_cleanup(), abort_recovery(), and class_fail_export()
1119          * all end up in here, and if any of them race we shouldn't
1120          * call extra class_export_puts(). */
1121         if (already_disconnected) {
1122                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1123                 GOTO(no_disconn, already_disconnected);
1124         }
1125
1126         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1127                export->exp_handle.h_cookie);
1128
1129         if (!hlist_unhashed(&export->exp_nid_hash))
1130                 lustre_hash_del(export->exp_obd->obd_nid_hash,
1131                                 &export->exp_connection->c_peer.nid,
1132                                 &export->exp_nid_hash);
1133
1134         class_export_recovery_cleanup(export);
1135         class_unlink_export(export);
1136 no_disconn:
1137         class_export_put(export);
1138         RETURN(0);
1139 }
1140
1141 static void class_disconnect_export_list(struct list_head *list,
1142                                          enum obd_option flags)
1143 {
1144         int rc;
1145         struct obd_export *exp;
1146         ENTRY;
1147
1148         /* It's possible that an export may disconnect itself, but
1149          * nothing else will be added to this list. */
1150         while (!list_empty(list)) {
1151                 exp = list_entry(list->next, struct obd_export, exp_obd_chain);
1152                 /* need for safe call CDEBUG after obd_disconnect */
1153                 class_export_get(exp);
1154
1155                 spin_lock(&exp->exp_lock);
1156                 exp->exp_flags = flags;
1157                 spin_unlock(&exp->exp_lock);
1158
1159                 if (obd_uuid_equals(&exp->exp_client_uuid,
1160                                     &exp->exp_obd->obd_uuid)) {
1161                         CDEBUG(D_HA,
1162                                "exp %p export uuid == obd uuid, don't discon\n",
1163                                exp);
1164                         /* Need to delete this now so we don't end up pointing
1165                          * to work_list later when this export is cleaned up. */
1166                         list_del_init(&exp->exp_obd_chain);
1167                         class_export_put(exp);
1168                         continue;
1169                 }
1170
1171                 class_export_get(exp);
1172                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1173                        "last request at "CFS_TIME_T"\n",
1174                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1175                        exp, exp->exp_last_request_time);
1176                 /* release one export reference anyway */
1177                 rc = obd_disconnect(exp);
1178
1179                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1180                        obd_export_nid2str(exp), exp, rc);
1181                 class_export_put(exp);
1182         }
1183         EXIT;
1184 }
1185
1186 void class_disconnect_exports(struct obd_device *obd)
1187 {
1188         struct list_head work_list;
1189         ENTRY;
1190
1191         /* Move all of the exports from obd_exports to a work list, en masse. */
1192         CFS_INIT_LIST_HEAD(&work_list);
1193         spin_lock(&obd->obd_dev_lock);
1194         list_splice_init(&obd->obd_exports, &work_list);
1195         list_splice_init(&obd->obd_delayed_exports, &work_list);
1196         spin_unlock(&obd->obd_dev_lock);
1197
1198         if (!list_empty(&work_list)) {
1199                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1200                        "disconnecting them\n", obd->obd_minor, obd);
1201                 class_disconnect_export_list(&work_list,
1202                                              exp_flags_from_obd(obd));
1203         } else
1204                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1205                        obd->obd_minor, obd);
1206         EXIT;
1207 }
1208 EXPORT_SYMBOL(class_disconnect_exports);
1209
1210 /* Remove exports that have not completed recovery.
1211  */
1212 void class_disconnect_stale_exports(struct obd_device *obd,
1213                                     int (*test_export)(struct obd_export *))
1214 {
1215         struct list_head work_list;
1216         struct list_head *pos, *n;
1217         struct obd_export *exp;
1218         int evicted = 0;
1219         ENTRY;
1220
1221         CFS_INIT_LIST_HEAD(&work_list);
1222         spin_lock(&obd->obd_dev_lock);
1223         list_for_each_safe(pos, n, &obd->obd_exports) {
1224                 exp = list_entry(pos, struct obd_export, exp_obd_chain);
1225                 if (test_export(exp))
1226                         continue;
1227
1228                 /* don't count self-export as client */
1229                 if (obd_uuid_equals(&exp->exp_client_uuid,
1230                                     &exp->exp_obd->obd_uuid))
1231                         continue;
1232
1233                 list_move(&exp->exp_obd_chain, &work_list);
1234                 evicted++;
1235                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1236                        obd->obd_name, exp->exp_client_uuid.uuid,
1237                        exp->exp_connection == NULL ? "<unknown>" :
1238                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1239                 print_export_data(exp, "EVICTING", 0);
1240         }
1241         spin_unlock(&obd->obd_dev_lock);
1242
1243         if (evicted) {
1244                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1245                        obd->obd_name, evicted);
1246                 obd->obd_stale_clients += evicted;
1247         }
1248         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1249                                                  OBD_OPT_ABORT_RECOV);
1250         EXIT;
1251 }
1252 EXPORT_SYMBOL(class_disconnect_stale_exports);
1253
1254 void class_fail_export(struct obd_export *exp)
1255 {
1256         int rc, already_failed;
1257
1258         spin_lock(&exp->exp_lock);
1259         already_failed = exp->exp_failed;
1260         exp->exp_failed = 1;
1261         spin_unlock(&exp->exp_lock);
1262
1263         if (already_failed) {
1264                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1265                        exp, exp->exp_client_uuid.uuid);
1266                 return;
1267         }
1268
1269         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1270                exp, exp->exp_client_uuid.uuid);
1271
1272         if (obd_dump_on_timeout)
1273                 libcfs_debug_dumplog();
1274
1275         /* Most callers into obd_disconnect are removing their own reference
1276          * (request, for example) in addition to the one from the hash table.
1277          * We don't have such a reference here, so make one. */
1278         class_export_get(exp);
1279         rc = obd_disconnect(exp);
1280         if (rc)
1281                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1282         else
1283                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1284                        exp, exp->exp_client_uuid.uuid);
1285 }
1286 EXPORT_SYMBOL(class_fail_export);
1287
1288 char *obd_export_nid2str(struct obd_export *exp)
1289 {
1290         if (exp->exp_connection != NULL)
1291                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1292
1293         return "(no nid)";
1294 }
1295 EXPORT_SYMBOL(obd_export_nid2str);
1296
1297 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1298 {
1299         struct obd_export *doomed_exp = NULL;
1300         int exports_evicted = 0;
1301
1302         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1303
1304         do {
1305                 doomed_exp = lustre_hash_lookup(obd->obd_nid_hash, &nid_key);
1306                 if (doomed_exp == NULL)
1307                         break;
1308
1309                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1310                          "nid %s found, wanted nid %s, requested nid %s\n",
1311                          obd_export_nid2str(doomed_exp),
1312                          libcfs_nid2str(nid_key), nid);
1313                 LASSERTF(doomed_exp != obd->obd_self_export,
1314                          "self-export is hashed by NID?\n");
1315                 exports_evicted++;
1316                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1317                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1318                        exports_evicted);
1319                 class_fail_export(doomed_exp);
1320                 class_export_put(doomed_exp);
1321         } while (1);
1322
1323         if (!exports_evicted)
1324                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1325                        obd->obd_name, nid);
1326         return exports_evicted;
1327 }
1328 EXPORT_SYMBOL(obd_export_evict_by_nid);
1329
1330 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1331 {
1332         struct obd_export *doomed_exp = NULL;
1333         struct obd_uuid doomed_uuid;
1334         int exports_evicted = 0;
1335
1336         obd_str2uuid(&doomed_uuid, uuid);
1337         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1338                 CERROR("%s: can't evict myself\n", obd->obd_name);
1339                 return exports_evicted;
1340         }
1341
1342         doomed_exp = lustre_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1343
1344         if (doomed_exp == NULL) {
1345                 CERROR("%s: can't disconnect %s: no exports found\n",
1346                        obd->obd_name, uuid);
1347         } else {
1348                 CWARN("%s: evicting %s at adminstrative request\n",
1349                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1350                 class_fail_export(doomed_exp);
1351                 class_export_put(doomed_exp);
1352                 exports_evicted++;
1353         }
1354
1355         return exports_evicted;
1356 }
1357 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1358
1359 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1360 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1361 EXPORT_SYMBOL(class_export_dump_hook);
1362 #endif
1363
1364 static void print_export_data(struct obd_export *exp, const char *status,
1365                               int locks)
1366 {
1367         struct ptlrpc_reply_state *rs;
1368         struct ptlrpc_reply_state *first_reply = NULL;
1369         int nreplies = 0;
1370
1371         spin_lock(&exp->exp_lock);
1372         list_for_each_entry (rs, &exp->exp_outstanding_replies, rs_exp_list) {
1373                 if (nreplies == 0)
1374                         first_reply = rs;
1375                 nreplies++;
1376         }
1377         spin_unlock(&exp->exp_lock);
1378
1379         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1380                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1381                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1382                atomic_read(&exp->exp_rpc_count),
1383                atomic_read(&exp->exp_cb_count),
1384                atomic_read(&exp->exp_locks_count),
1385                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1386                nreplies, first_reply, nreplies > 3 ? "..." : "",
1387                exp->exp_last_committed);
1388 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1389         if (locks && class_export_dump_hook != NULL)
1390                 class_export_dump_hook(exp);
1391 #endif
1392 }
1393
1394 void dump_exports(struct obd_device *obd, int locks)
1395 {
1396         struct obd_export *exp;
1397
1398         spin_lock(&obd->obd_dev_lock);
1399         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1400                 print_export_data(exp, "ACTIVE", locks);
1401         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1402                 print_export_data(exp, "UNLINKED", locks);
1403         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1404                 print_export_data(exp, "DELAYED", locks);
1405         spin_unlock(&obd->obd_dev_lock);
1406         spin_lock(&obd_zombie_impexp_lock);
1407         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1408                 print_export_data(exp, "ZOMBIE", locks);
1409         spin_unlock(&obd_zombie_impexp_lock);
1410 }
1411 EXPORT_SYMBOL(dump_exports);
1412
1413 void obd_exports_barrier(struct obd_device *obd)
1414 {
1415         int waited = 2;
1416         LASSERT(list_empty(&obd->obd_exports));
1417         spin_lock(&obd->obd_dev_lock);
1418         while (!list_empty(&obd->obd_unlinked_exports)) {
1419                 spin_unlock(&obd->obd_dev_lock);
1420                 cfs_schedule_timeout(CFS_TASK_UNINT, cfs_time_seconds(waited));
1421                 if (waited > 5 && IS_PO2(waited)) {
1422                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1423                                       "more than %d seconds. "
1424                                       "The obd refcount = %d. Is it stuck?\n",
1425                                       obd->obd_name, waited,
1426                                       atomic_read(&obd->obd_refcount));
1427                         dump_exports(obd, 0);
1428                 }
1429                 waited *= 2;
1430                 spin_lock(&obd->obd_dev_lock);
1431         }
1432         spin_unlock(&obd->obd_dev_lock);
1433 }
1434 EXPORT_SYMBOL(obd_exports_barrier);
1435
1436 /**
1437  * kill zombie imports and exports
1438  */
1439 void obd_zombie_impexp_cull(void)
1440 {
1441         struct obd_import *import;
1442         struct obd_export *export;
1443         ENTRY;
1444
1445         do {
1446                 spin_lock(&obd_zombie_impexp_lock);
1447
1448                 import = NULL;
1449                 if (!list_empty(&obd_zombie_imports)) {
1450                         import = list_entry(obd_zombie_imports.next,
1451                                             struct obd_import,
1452                                             imp_zombie_chain);
1453                         list_del_init(&import->imp_zombie_chain);
1454                 }
1455
1456                 export = NULL;
1457                 if (!list_empty(&obd_zombie_exports)) {
1458                         export = list_entry(obd_zombie_exports.next,
1459                                             struct obd_export,
1460                                             exp_obd_chain);
1461                         list_del_init(&export->exp_obd_chain);
1462                 }
1463
1464                 spin_unlock(&obd_zombie_impexp_lock);
1465
1466                 if (import != NULL)
1467                         class_import_destroy(import);
1468
1469                 if (export != NULL)
1470                         class_export_destroy(export);
1471
1472         } while (import != NULL || export != NULL);
1473         EXIT;
1474 }
1475
1476 static struct completion        obd_zombie_start;
1477 static struct completion        obd_zombie_stop;
1478 static unsigned long            obd_zombie_flags;
1479 static cfs_waitq_t              obd_zombie_waitq;
1480 static pid_t                    obd_zombie_pid;
1481
1482 enum {
1483         OBD_ZOMBIE_STOP   = 1 << 1
1484 };
1485
1486 /**
1487  * check for work for kill zombie import/export thread.
1488  */
1489 static int obd_zombie_impexp_check(void *arg)
1490 {
1491         int rc;
1492
1493         spin_lock(&obd_zombie_impexp_lock);
1494         rc = list_empty(&obd_zombie_imports) &&
1495              list_empty(&obd_zombie_exports) &&
1496              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1497
1498         spin_unlock(&obd_zombie_impexp_lock);
1499
1500         RETURN(rc);
1501 }
1502
1503 /**
1504  * Add export to the obd_zombe thread and notify it.
1505  */
1506 static void obd_zombie_export_add(struct obd_export *exp) {
1507         spin_lock(&exp->exp_obd->obd_dev_lock);
1508         LASSERT(!list_empty(&exp->exp_obd_chain));
1509         list_del_init(&exp->exp_obd_chain);
1510         spin_unlock(&exp->exp_obd->obd_dev_lock);
1511         spin_lock(&obd_zombie_impexp_lock);
1512         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1513         spin_unlock(&obd_zombie_impexp_lock);
1514
1515         if (obd_zombie_impexp_notify != NULL)
1516                 obd_zombie_impexp_notify();
1517 }
1518
1519 /**
1520  * Add import to the obd_zombe thread and notify it.
1521  */
1522 static void obd_zombie_import_add(struct obd_import *imp) {
1523         LASSERT(imp->imp_sec == NULL);
1524         spin_lock(&obd_zombie_impexp_lock);
1525         LASSERT(list_empty(&imp->imp_zombie_chain));
1526         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1527         spin_unlock(&obd_zombie_impexp_lock);
1528
1529         if (obd_zombie_impexp_notify != NULL)
1530                 obd_zombie_impexp_notify();
1531 }
1532
1533 /**
1534  * notify import/export destroy thread about new zombie.
1535  */
1536 static void obd_zombie_impexp_notify(void)
1537 {
1538         cfs_waitq_signal(&obd_zombie_waitq);
1539 }
1540
1541 /**
1542  * check whether obd_zombie is idle
1543  */
1544 static int obd_zombie_is_idle(void)
1545 {
1546         int rc;
1547
1548         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1549         spin_lock(&obd_zombie_impexp_lock);
1550         rc = list_empty(&obd_zombie_imports) &&
1551              list_empty(&obd_zombie_exports);
1552         spin_unlock(&obd_zombie_impexp_lock);
1553         return rc;
1554 }
1555
1556 /**
1557  * wait when obd_zombie import/export queues become empty
1558  */
1559 void obd_zombie_barrier(void)
1560 {
1561         struct l_wait_info lwi = { 0 };
1562
1563         if (obd_zombie_pid == cfs_curproc_pid())
1564                 /* don't wait for myself */
1565                 return;
1566         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1567 }
1568 EXPORT_SYMBOL(obd_zombie_barrier);
1569
1570 #ifdef __KERNEL__
1571
1572 /**
1573  * destroy zombie export/import thread.
1574  */
1575 static int obd_zombie_impexp_thread(void *unused)
1576 {
1577         int rc;
1578
1579         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1580                 complete(&obd_zombie_start);
1581                 RETURN(rc);
1582         }
1583
1584         complete(&obd_zombie_start);
1585
1586         obd_zombie_pid = cfs_curproc_pid();
1587
1588         while(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1589                 struct l_wait_info lwi = { 0 };
1590
1591                 l_wait_event(obd_zombie_waitq,
1592                              !obd_zombie_impexp_check(NULL), &lwi);
1593                 obd_zombie_impexp_cull();
1594
1595                 /*
1596                  * Notify obd_zombie_barrier callers that queues
1597                  * may be empty.
1598                  */
1599                 cfs_waitq_signal(&obd_zombie_waitq);
1600         }
1601
1602         complete(&obd_zombie_stop);
1603
1604         RETURN(0);
1605 }
1606
1607 #else /* ! KERNEL */
1608
1609 static atomic_t zombie_recur = ATOMIC_INIT(0);
1610 static void *obd_zombie_impexp_work_cb;
1611 static void *obd_zombie_impexp_idle_cb;
1612
1613 int obd_zombie_impexp_kill(void *arg)
1614 {
1615         int rc = 0;
1616
1617         if (atomic_inc_return(&zombie_recur) == 1) {
1618                 obd_zombie_impexp_cull();
1619                 rc = 1;
1620         }
1621         atomic_dec(&zombie_recur);
1622         return rc;
1623 }
1624
1625 #endif
1626
1627 /**
1628  * start destroy zombie import/export thread
1629  */
1630 int obd_zombie_impexp_init(void)
1631 {
1632         int rc;
1633
1634         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1635         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1636         spin_lock_init(&obd_zombie_impexp_lock);
1637         init_completion(&obd_zombie_start);
1638         init_completion(&obd_zombie_stop);
1639         cfs_waitq_init(&obd_zombie_waitq);
1640         obd_zombie_pid = 0;
1641
1642 #ifdef __KERNEL__
1643         rc = cfs_kernel_thread(obd_zombie_impexp_thread, NULL, 0);
1644         if (rc < 0)
1645                 RETURN(rc);
1646
1647         wait_for_completion(&obd_zombie_start);
1648 #else
1649
1650         obd_zombie_impexp_work_cb =
1651                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1652                                                  &obd_zombie_impexp_kill, NULL);
1653
1654         obd_zombie_impexp_idle_cb =
1655                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1656                                                  &obd_zombie_impexp_check, NULL);
1657         rc = 0;
1658 #endif
1659         RETURN(rc);
1660 }
1661 /**
1662  * stop destroy zombie import/export thread
1663  */
1664 void obd_zombie_impexp_stop(void)
1665 {
1666         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1667         obd_zombie_impexp_notify();
1668 #ifdef __KERNEL__
1669         wait_for_completion(&obd_zombie_stop);
1670 #else
1671         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1672         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1673 #endif
1674 }
1675