Whamcloud - gitweb
158493040156fb48b239c0b600724c9332b9c05f
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2011, 2012, Whamcloud, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lustre/obdclass/genops.c
39  *
40  * These are the only exported functions, they provide some generic
41  * infrastructure for managing object devices
42  */
43
44 #define DEBUG_SUBSYSTEM S_CLASS
45 #ifndef __KERNEL__
46 #include <liblustre.h>
47 #endif
48 #include <obd_ost.h>
49 #include <obd_class.h>
50 #include <lprocfs_status.h>
51
52 extern cfs_list_t obd_types;
53 cfs_spinlock_t obd_types_lock;
54
55 cfs_mem_cache_t *obd_device_cachep;
56 cfs_mem_cache_t *obdo_cachep;
57 EXPORT_SYMBOL(obdo_cachep);
58 cfs_mem_cache_t *import_cachep;
59
60 cfs_list_t      obd_zombie_imports;
61 cfs_list_t      obd_zombie_exports;
62 cfs_spinlock_t  obd_zombie_impexp_lock;
63 static void obd_zombie_impexp_notify(void);
64 static void obd_zombie_export_add(struct obd_export *exp);
65 static void obd_zombie_import_add(struct obd_import *imp);
66 static void print_export_data(struct obd_export *exp,
67                               const char *status, int locks);
68
69 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
70
71 /*
72  * support functions: we could use inter-module communication, but this
73  * is more portable to other OS's
74  */
75 static struct obd_device *obd_device_alloc(void)
76 {
77         struct obd_device *obd;
78
79         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
80         if (obd != NULL) {
81                 obd->obd_magic = OBD_DEVICE_MAGIC;
82         }
83         return obd;
84 }
85
86 static void obd_device_free(struct obd_device *obd)
87 {
88         LASSERT(obd != NULL);
89         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
90                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
91         if (obd->obd_namespace != NULL) {
92                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
93                        obd, obd->obd_namespace, obd->obd_force);
94                 LBUG();
95         }
96         lu_ref_fini(&obd->obd_reference);
97         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 }
99
100 struct obd_type *class_search_type(const char *name)
101 {
102         cfs_list_t *tmp;
103         struct obd_type *type;
104
105         cfs_spin_lock(&obd_types_lock);
106         cfs_list_for_each(tmp, &obd_types) {
107                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
108                 if (strcmp(type->typ_name, name) == 0) {
109                         cfs_spin_unlock(&obd_types_lock);
110                         return type;
111                 }
112         }
113         cfs_spin_unlock(&obd_types_lock);
114         return NULL;
115 }
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124                 if (!cfs_request_module("%s", modname)) {
125                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126                         type = class_search_type(name);
127                 } else {
128                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
129                                            modname);
130                 }
131         }
132 #endif
133         if (type) {
134                 cfs_spin_lock(&type->obd_type_lock);
135                 type->typ_refcnt++;
136                 cfs_try_module_get(type->typ_dt_ops->o_owner);
137                 cfs_spin_unlock(&type->obd_type_lock);
138         }
139         return type;
140 }
141 EXPORT_SYMBOL(class_get_type);
142
143 void class_put_type(struct obd_type *type)
144 {
145         LASSERT(type);
146         cfs_spin_lock(&type->obd_type_lock);
147         type->typ_refcnt--;
148         cfs_module_put(type->typ_dt_ops->o_owner);
149         cfs_spin_unlock(&type->obd_type_lock);
150 }
151 EXPORT_SYMBOL(class_put_type);
152
153 #define CLASS_MAX_NAME 1024
154
155 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
156                         struct lprocfs_vars *vars, const char *name,
157                         struct lu_device_type *ldt)
158 {
159         struct obd_type *type;
160         int rc = 0;
161         ENTRY;
162
163         /* sanity check */
164         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
165
166         if (class_search_type(name)) {
167                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
168                 RETURN(-EEXIST);
169         }
170
171         rc = -ENOMEM;
172         OBD_ALLOC(type, sizeof(*type));
173         if (type == NULL)
174                 RETURN(rc);
175
176         OBD_ALLOC_PTR(type->typ_dt_ops);
177         OBD_ALLOC_PTR(type->typ_md_ops);
178         OBD_ALLOC(type->typ_name, strlen(name) + 1);
179
180         if (type->typ_dt_ops == NULL ||
181             type->typ_md_ops == NULL ||
182             type->typ_name == NULL)
183                 GOTO (failed, rc);
184
185         *(type->typ_dt_ops) = *dt_ops;
186         /* md_ops is optional */
187         if (md_ops)
188                 *(type->typ_md_ops) = *md_ops;
189         strcpy(type->typ_name, name);
190         cfs_spin_lock_init(&type->obd_type_lock);
191
192 #ifdef LPROCFS
193         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
194                                               vars, type);
195         if (IS_ERR(type->typ_procroot)) {
196                 rc = PTR_ERR(type->typ_procroot);
197                 type->typ_procroot = NULL;
198                 GOTO (failed, rc);
199         }
200 #endif
201         if (ldt != NULL) {
202                 type->typ_lu = ldt;
203                 rc = lu_device_type_init(ldt);
204                 if (rc != 0)
205                         GOTO (failed, rc);
206         }
207
208         cfs_spin_lock(&obd_types_lock);
209         cfs_list_add(&type->typ_chain, &obd_types);
210         cfs_spin_unlock(&obd_types_lock);
211
212         RETURN (0);
213
214  failed:
215         if (type->typ_name != NULL)
216                 OBD_FREE(type->typ_name, strlen(name) + 1);
217         if (type->typ_md_ops != NULL)
218                 OBD_FREE_PTR(type->typ_md_ops);
219         if (type->typ_dt_ops != NULL)
220                 OBD_FREE_PTR(type->typ_dt_ops);
221         OBD_FREE(type, sizeof(*type));
222         RETURN(rc);
223 }
224 EXPORT_SYMBOL(class_register_type);
225
226 int class_unregister_type(const char *name)
227 {
228         struct obd_type *type = class_search_type(name);
229         ENTRY;
230
231         if (!type) {
232                 CERROR("unknown obd type\n");
233                 RETURN(-EINVAL);
234         }
235
236         if (type->typ_refcnt) {
237                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
238                 /* This is a bad situation, let's make the best of it */
239                 /* Remove ops, but leave the name for debugging */
240                 OBD_FREE_PTR(type->typ_dt_ops);
241                 OBD_FREE_PTR(type->typ_md_ops);
242                 RETURN(-EBUSY);
243         }
244
245         if (type->typ_procroot) {
246                 lprocfs_remove(&type->typ_procroot);
247         }
248
249         if (type->typ_lu)
250                 lu_device_type_fini(type->typ_lu);
251
252         cfs_spin_lock(&obd_types_lock);
253         cfs_list_del(&type->typ_chain);
254         cfs_spin_unlock(&obd_types_lock);
255         OBD_FREE(type->typ_name, strlen(name) + 1);
256         if (type->typ_dt_ops != NULL)
257                 OBD_FREE_PTR(type->typ_dt_ops);
258         if (type->typ_md_ops != NULL)
259                 OBD_FREE_PTR(type->typ_md_ops);
260         OBD_FREE(type, sizeof(*type));
261         RETURN(0);
262 } /* class_unregister_type */
263 EXPORT_SYMBOL(class_unregister_type);
264
265 /**
266  * Create a new obd device.
267  *
268  * Find an empty slot in ::obd_devs[], create a new obd device in it.
269  *
270  * \param[in] type_name obd device type string.
271  * \param[in] name      obd device name.
272  *
273  * \retval NULL if create fails, otherwise return the obd device
274  *         pointer created.
275  */
276 struct obd_device *class_newdev(const char *type_name, const char *name)
277 {
278         struct obd_device *result = NULL;
279         struct obd_device *newdev;
280         struct obd_type *type = NULL;
281         int i;
282         int new_obd_minor = 0;
283         ENTRY;
284
285         if (strlen(name) >= MAX_OBD_NAME) {
286                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
287                 RETURN(ERR_PTR(-EINVAL));
288         }
289
290         type = class_get_type(type_name);
291         if (type == NULL){
292                 CERROR("OBD: unknown type: %s\n", type_name);
293                 RETURN(ERR_PTR(-ENODEV));
294         }
295
296         newdev = obd_device_alloc();
297         if (newdev == NULL) {
298                 class_put_type(type);
299                 RETURN(ERR_PTR(-ENOMEM));
300         }
301         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
302
303         cfs_write_lock(&obd_dev_lock);
304         for (i = 0; i < class_devno_max(); i++) {
305                 struct obd_device *obd = class_num2obd(i);
306
307                 if (obd && obd->obd_name &&
308                     (strcmp(name, obd->obd_name) == 0)) {
309                         CERROR("Device %s already exists at %d, won't add\n",
310                                name, i);
311                         if (result) {
312                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
313                                          "%p obd_magic %08x != %08x\n", result,
314                                          result->obd_magic, OBD_DEVICE_MAGIC);
315                                 LASSERTF(result->obd_minor == new_obd_minor,
316                                          "%p obd_minor %d != %d\n", result,
317                                          result->obd_minor, new_obd_minor);
318
319                                 obd_devs[result->obd_minor] = NULL;
320                                 result->obd_name[0]='\0';
321                          }
322                         result = ERR_PTR(-EEXIST);
323                         break;
324                 }
325                 if (!result && !obd) {
326                         result = newdev;
327                         result->obd_minor = i;
328                         new_obd_minor = i;
329                         result->obd_type = type;
330                         strncpy(result->obd_name, name,
331                                 sizeof(result->obd_name) - 1);
332                         obd_devs[i] = result;
333                 }
334         }
335         cfs_write_unlock(&obd_dev_lock);
336
337         if (result == NULL && i >= class_devno_max()) {
338                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
339                        class_devno_max());
340                 RETURN(ERR_PTR(-EOVERFLOW));
341         }
342
343         if (IS_ERR(result)) {
344                 obd_device_free(newdev);
345                 class_put_type(type);
346         } else {
347                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
348                        result->obd_name, result);
349         }
350         RETURN(result);
351 }
352
353 void class_release_dev(struct obd_device *obd)
354 {
355         struct obd_type *obd_type = obd->obd_type;
356
357         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
358                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
359         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
360                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
361         LASSERT(obd_type != NULL);
362
363         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
364                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
365
366         cfs_write_lock(&obd_dev_lock);
367         obd_devs[obd->obd_minor] = NULL;
368         cfs_write_unlock(&obd_dev_lock);
369         obd_device_free(obd);
370
371         class_put_type(obd_type);
372 }
373
374 int class_name2dev(const char *name)
375 {
376         int i;
377
378         if (!name)
379                 return -1;
380
381         cfs_read_lock(&obd_dev_lock);
382         for (i = 0; i < class_devno_max(); i++) {
383                 struct obd_device *obd = class_num2obd(i);
384
385                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
386                         /* Make sure we finished attaching before we give
387                            out any references */
388                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
389                         if (obd->obd_attached) {
390                                 cfs_read_unlock(&obd_dev_lock);
391                                 return i;
392                         }
393                         break;
394                 }
395         }
396         cfs_read_unlock(&obd_dev_lock);
397
398         return -1;
399 }
400 EXPORT_SYMBOL(class_name2dev);
401
402 struct obd_device *class_name2obd(const char *name)
403 {
404         int dev = class_name2dev(name);
405
406         if (dev < 0 || dev > class_devno_max())
407                 return NULL;
408         return class_num2obd(dev);
409 }
410 EXPORT_SYMBOL(class_name2obd);
411
412 int class_uuid2dev(struct obd_uuid *uuid)
413 {
414         int i;
415
416         cfs_read_lock(&obd_dev_lock);
417         for (i = 0; i < class_devno_max(); i++) {
418                 struct obd_device *obd = class_num2obd(i);
419
420                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
421                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
422                         cfs_read_unlock(&obd_dev_lock);
423                         return i;
424                 }
425         }
426         cfs_read_unlock(&obd_dev_lock);
427
428         return -1;
429 }
430 EXPORT_SYMBOL(class_uuid2dev);
431
432 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
433 {
434         int dev = class_uuid2dev(uuid);
435         if (dev < 0)
436                 return NULL;
437         return class_num2obd(dev);
438 }
439 EXPORT_SYMBOL(class_uuid2obd);
440
441 /**
442  * Get obd device from ::obd_devs[]
443  *
444  * \param num [in] array index
445  *
446  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
447  *         otherwise return the obd device there.
448  */
449 struct obd_device *class_num2obd(int num)
450 {
451         struct obd_device *obd = NULL;
452
453         if (num < class_devno_max()) {
454                 obd = obd_devs[num];
455                 if (obd == NULL)
456                         return NULL;
457
458                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
459                          "%p obd_magic %08x != %08x\n",
460                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
461                 LASSERTF(obd->obd_minor == num,
462                          "%p obd_minor %0d != %0d\n",
463                          obd, obd->obd_minor, num);
464         }
465
466         return obd;
467 }
468 EXPORT_SYMBOL(class_num2obd);
469
470 void class_obd_list(void)
471 {
472         char *status;
473         int i;
474
475         cfs_read_lock(&obd_dev_lock);
476         for (i = 0; i < class_devno_max(); i++) {
477                 struct obd_device *obd = class_num2obd(i);
478
479                 if (obd == NULL)
480                         continue;
481                 if (obd->obd_stopping)
482                         status = "ST";
483                 else if (obd->obd_set_up)
484                         status = "UP";
485                 else if (obd->obd_attached)
486                         status = "AT";
487                 else
488                         status = "--";
489                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
490                          i, status, obd->obd_type->typ_name,
491                          obd->obd_name, obd->obd_uuid.uuid,
492                          cfs_atomic_read(&obd->obd_refcount));
493         }
494         cfs_read_unlock(&obd_dev_lock);
495         return;
496 }
497
498 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
499    specified, then only the client with that uuid is returned,
500    otherwise any client connected to the tgt is returned. */
501 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
502                                           const char * typ_name,
503                                           struct obd_uuid *grp_uuid)
504 {
505         int i;
506
507         cfs_read_lock(&obd_dev_lock);
508         for (i = 0; i < class_devno_max(); i++) {
509                 struct obd_device *obd = class_num2obd(i);
510
511                 if (obd == NULL)
512                         continue;
513                 if ((strncmp(obd->obd_type->typ_name, typ_name,
514                              strlen(typ_name)) == 0)) {
515                         if (obd_uuid_equals(tgt_uuid,
516                                             &obd->u.cli.cl_target_uuid) &&
517                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
518                                                          &obd->obd_uuid) : 1)) {
519                                 cfs_read_unlock(&obd_dev_lock);
520                                 return obd;
521                         }
522                 }
523         }
524         cfs_read_unlock(&obd_dev_lock);
525
526         return NULL;
527 }
528 EXPORT_SYMBOL(class_find_client_obd);
529
530 /* Iterate the obd_device list looking devices have grp_uuid. Start
531    searching at *next, and if a device is found, the next index to look
532    at is saved in *next. If next is NULL, then the first matching device
533    will always be returned. */
534 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
535 {
536         int i;
537
538         if (next == NULL)
539                 i = 0;
540         else if (*next >= 0 && *next < class_devno_max())
541                 i = *next;
542         else
543                 return NULL;
544
545         cfs_read_lock(&obd_dev_lock);
546         for (; i < class_devno_max(); i++) {
547                 struct obd_device *obd = class_num2obd(i);
548
549                 if (obd == NULL)
550                         continue;
551                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
552                         if (next != NULL)
553                                 *next = i+1;
554                         cfs_read_unlock(&obd_dev_lock);
555                         return obd;
556                 }
557         }
558         cfs_read_unlock(&obd_dev_lock);
559
560         return NULL;
561 }
562 EXPORT_SYMBOL(class_devices_in_group);
563
564 /**
565  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
566  * adjust sptlrpc settings accordingly.
567  */
568 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
569 {
570         struct obd_device  *obd;
571         const char         *type;
572         int                 i, rc = 0, rc2;
573
574         LASSERT(namelen > 0);
575
576         cfs_read_lock(&obd_dev_lock);
577         for (i = 0; i < class_devno_max(); i++) {
578                 obd = class_num2obd(i);
579
580                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
581                         continue;
582
583                 /* only notify mdc, osc, mdt, ost */
584                 type = obd->obd_type->typ_name;
585                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
586                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
587                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
588                     strcmp(type, LUSTRE_OST_NAME) != 0)
589                         continue;
590
591                 if (strncmp(obd->obd_name, fsname, namelen))
592                         continue;
593
594                 class_incref(obd, __FUNCTION__, obd);
595                 cfs_read_unlock(&obd_dev_lock);
596                 rc2 = obd_set_info_async(obd->obd_self_export,
597                                          sizeof(KEY_SPTLRPC_CONF),
598                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
599                 rc = rc ? rc : rc2;
600                 class_decref(obd, __FUNCTION__, obd);
601                 cfs_read_lock(&obd_dev_lock);
602         }
603         cfs_read_unlock(&obd_dev_lock);
604         return rc;
605 }
606 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
607
608 void obd_cleanup_caches(void)
609 {
610         int rc;
611
612         ENTRY;
613         if (obd_device_cachep) {
614                 rc = cfs_mem_cache_destroy(obd_device_cachep);
615                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
616                 obd_device_cachep = NULL;
617         }
618         if (obdo_cachep) {
619                 rc = cfs_mem_cache_destroy(obdo_cachep);
620                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
621                 obdo_cachep = NULL;
622         }
623         if (import_cachep) {
624                 rc = cfs_mem_cache_destroy(import_cachep);
625                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
626                 import_cachep = NULL;
627         }
628         if (capa_cachep) {
629                 rc = cfs_mem_cache_destroy(capa_cachep);
630                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
631                 capa_cachep = NULL;
632         }
633         EXIT;
634 }
635
636 int obd_init_caches(void)
637 {
638         ENTRY;
639
640         LASSERT(obd_device_cachep == NULL);
641         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
642                                                  sizeof(struct obd_device),
643                                                  0, 0);
644         if (!obd_device_cachep)
645                 GOTO(out, -ENOMEM);
646
647         LASSERT(obdo_cachep == NULL);
648         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
649                                            0, 0);
650         if (!obdo_cachep)
651                 GOTO(out, -ENOMEM);
652
653         LASSERT(import_cachep == NULL);
654         import_cachep = cfs_mem_cache_create("ll_import_cache",
655                                              sizeof(struct obd_import),
656                                              0, 0);
657         if (!import_cachep)
658                 GOTO(out, -ENOMEM);
659
660         LASSERT(capa_cachep == NULL);
661         capa_cachep = cfs_mem_cache_create("capa_cache",
662                                            sizeof(struct obd_capa), 0, 0);
663         if (!capa_cachep)
664                 GOTO(out, -ENOMEM);
665
666         RETURN(0);
667  out:
668         obd_cleanup_caches();
669         RETURN(-ENOMEM);
670
671 }
672
673 /* map connection to client */
674 struct obd_export *class_conn2export(struct lustre_handle *conn)
675 {
676         struct obd_export *export;
677         ENTRY;
678
679         if (!conn) {
680                 CDEBUG(D_CACHE, "looking for null handle\n");
681                 RETURN(NULL);
682         }
683
684         if (conn->cookie == -1) {  /* this means assign a new connection */
685                 CDEBUG(D_CACHE, "want a new connection\n");
686                 RETURN(NULL);
687         }
688
689         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
690         export = class_handle2object(conn->cookie);
691         RETURN(export);
692 }
693 EXPORT_SYMBOL(class_conn2export);
694
695 struct obd_device *class_exp2obd(struct obd_export *exp)
696 {
697         if (exp)
698                 return exp->exp_obd;
699         return NULL;
700 }
701 EXPORT_SYMBOL(class_exp2obd);
702
703 struct obd_device *class_conn2obd(struct lustre_handle *conn)
704 {
705         struct obd_export *export;
706         export = class_conn2export(conn);
707         if (export) {
708                 struct obd_device *obd = export->exp_obd;
709                 class_export_put(export);
710                 return obd;
711         }
712         return NULL;
713 }
714 EXPORT_SYMBOL(class_conn2obd);
715
716 struct obd_import *class_exp2cliimp(struct obd_export *exp)
717 {
718         struct obd_device *obd = exp->exp_obd;
719         if (obd == NULL)
720                 return NULL;
721         return obd->u.cli.cl_import;
722 }
723 EXPORT_SYMBOL(class_exp2cliimp);
724
725 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
726 {
727         struct obd_device *obd = class_conn2obd(conn);
728         if (obd == NULL)
729                 return NULL;
730         return obd->u.cli.cl_import;
731 }
732 EXPORT_SYMBOL(class_conn2cliimp);
733
734 /* Export management functions */
735
736 /* if export is involved in recovery then clean up related things */
737 void class_export_recovery_cleanup(struct obd_export *exp)
738 {
739         struct obd_device *obd = exp->exp_obd;
740
741         cfs_spin_lock(&obd->obd_recovery_task_lock);
742         if (exp->exp_delayed)
743                 obd->obd_delayed_clients--;
744         if (obd->obd_recovering && exp->exp_in_recovery) {
745                 cfs_spin_lock(&exp->exp_lock);
746                 exp->exp_in_recovery = 0;
747                 cfs_spin_unlock(&exp->exp_lock);
748                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
749                 cfs_atomic_dec(&obd->obd_connected_clients);
750         }
751         cfs_spin_unlock(&obd->obd_recovery_task_lock);
752         /** Cleanup req replay fields */
753         if (exp->exp_req_replay_needed) {
754                 cfs_spin_lock(&exp->exp_lock);
755                 exp->exp_req_replay_needed = 0;
756                 cfs_spin_unlock(&exp->exp_lock);
757                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
758                 cfs_atomic_dec(&obd->obd_req_replay_clients);
759         }
760         /** Cleanup lock replay data */
761         if (exp->exp_lock_replay_needed) {
762                 cfs_spin_lock(&exp->exp_lock);
763                 exp->exp_lock_replay_needed = 0;
764                 cfs_spin_unlock(&exp->exp_lock);
765                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
766                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
767         }
768 }
769
770 static void class_export_destroy(struct obd_export *exp)
771 {
772         struct obd_device *obd = exp->exp_obd;
773         ENTRY;
774
775         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
776         LASSERT(obd != NULL);
777
778         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
779                exp->exp_client_uuid.uuid, obd->obd_name);
780
781
782         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
783         if (exp->exp_connection)
784                 ptlrpc_put_connection_superhack(exp->exp_connection);
785
786         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
787         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
788         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
789         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
790         obd_destroy_export(exp);
791         class_decref(obd, "export", exp);
792
793         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
794         EXIT;
795 }
796
797 static void export_handle_addref(void *export)
798 {
799         class_export_get(export);
800 }
801
802 struct obd_export *class_export_get(struct obd_export *exp)
803 {
804         cfs_atomic_inc(&exp->exp_refcount);
805         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
806                cfs_atomic_read(&exp->exp_refcount));
807         return exp;
808 }
809 EXPORT_SYMBOL(class_export_get);
810
811 void class_export_put(struct obd_export *exp)
812 {
813         LASSERT(exp != NULL);
814         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
815         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
816                cfs_atomic_read(&exp->exp_refcount) - 1);
817
818         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
819                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
820                 CDEBUG(D_IOCTL, "final put %p/%s\n",
821                        exp, exp->exp_client_uuid.uuid);
822
823                 /* release nid stat refererence */
824                 lprocfs_exp_cleanup(exp);
825                 class_export_recovery_cleanup(exp);
826
827                 obd_zombie_export_add(exp);
828         }
829 }
830 EXPORT_SYMBOL(class_export_put);
831
832 /* Creates a new export, adds it to the hash table, and returns a
833  * pointer to it. The refcount is 2: one for the hash reference, and
834  * one for the pointer returned by this function. */
835 struct obd_export *class_new_export(struct obd_device *obd,
836                                     struct obd_uuid *cluuid)
837 {
838         struct obd_export *export;
839         cfs_hash_t *hash = NULL;
840         int rc = 0;
841         ENTRY;
842
843         OBD_ALLOC_PTR(export);
844         if (!export)
845                 return ERR_PTR(-ENOMEM);
846
847         export->exp_conn_cnt = 0;
848         export->exp_lock_hash = NULL;
849         cfs_atomic_set(&export->exp_refcount, 2);
850         cfs_atomic_set(&export->exp_rpc_count, 0);
851         cfs_atomic_set(&export->exp_cb_count, 0);
852         cfs_atomic_set(&export->exp_locks_count, 0);
853 #if LUSTRE_TRACKS_LOCK_EXP_REFS
854         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
855         cfs_spin_lock_init(&export->exp_locks_list_guard);
856 #endif
857         cfs_atomic_set(&export->exp_replay_count, 0);
858         export->exp_obd = obd;
859         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
860         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
861         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
862         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
863         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
864         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
865         class_handle_hash(&export->exp_handle, export_handle_addref);
866         export->exp_last_request_time = cfs_time_current_sec();
867         cfs_spin_lock_init(&export->exp_lock);
868         cfs_spin_lock_init(&export->exp_rpc_lock);
869         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
870         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
871         cfs_spin_lock_init(&export->exp_bl_list_lock);
872         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
873
874         export->exp_sp_peer = LUSTRE_SP_ANY;
875         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
876         export->exp_client_uuid = *cluuid;
877         obd_init_export(export);
878
879         cfs_spin_lock(&obd->obd_dev_lock);
880          /* shouldn't happen, but might race */
881         if (obd->obd_stopping)
882                 GOTO(exit_unlock, rc = -ENODEV);
883
884         hash = cfs_hash_getref(obd->obd_uuid_hash);
885         if (hash == NULL)
886                 GOTO(exit_unlock, rc = -ENODEV);
887         cfs_spin_unlock(&obd->obd_dev_lock);
888
889         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
890                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
891                 if (rc != 0) {
892                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
893                                       obd->obd_name, cluuid->uuid, rc);
894                         GOTO(exit_err, rc = -EALREADY);
895                 }
896         }
897
898         cfs_spin_lock(&obd->obd_dev_lock);
899         if (obd->obd_stopping) {
900                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
901                 GOTO(exit_unlock, rc = -ENODEV);
902         }
903
904         class_incref(obd, "export", export);
905         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
906         cfs_list_add_tail(&export->exp_obd_chain_timed,
907                           &export->exp_obd->obd_exports_timed);
908         export->exp_obd->obd_num_exports++;
909         cfs_spin_unlock(&obd->obd_dev_lock);
910         cfs_hash_putref(hash);
911         RETURN(export);
912
913 exit_unlock:
914         cfs_spin_unlock(&obd->obd_dev_lock);
915 exit_err:
916         if (hash)
917                 cfs_hash_putref(hash);
918         class_handle_unhash(&export->exp_handle);
919         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
920         obd_destroy_export(export);
921         OBD_FREE_PTR(export);
922         return ERR_PTR(rc);
923 }
924 EXPORT_SYMBOL(class_new_export);
925
926 void class_unlink_export(struct obd_export *exp)
927 {
928         class_handle_unhash(&exp->exp_handle);
929
930         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
931         /* delete an uuid-export hashitem from hashtables */
932         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
933                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
934                              &exp->exp_client_uuid,
935                              &exp->exp_uuid_hash);
936
937         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
938         cfs_list_del_init(&exp->exp_obd_chain_timed);
939         exp->exp_obd->obd_num_exports--;
940         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
941         class_export_put(exp);
942 }
943 EXPORT_SYMBOL(class_unlink_export);
944
945 /* Import management functions */
946 void class_import_destroy(struct obd_import *imp)
947 {
948         ENTRY;
949
950         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
951                 imp->imp_obd->obd_name);
952
953         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
954
955         ptlrpc_put_connection_superhack(imp->imp_connection);
956
957         while (!cfs_list_empty(&imp->imp_conn_list)) {
958                 struct obd_import_conn *imp_conn;
959
960                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
961                                           struct obd_import_conn, oic_item);
962                 cfs_list_del_init(&imp_conn->oic_item);
963                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
964                 OBD_FREE(imp_conn, sizeof(*imp_conn));
965         }
966
967         LASSERT(imp->imp_sec == NULL);
968         class_decref(imp->imp_obd, "import", imp);
969         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
970         EXIT;
971 }
972
973 static void import_handle_addref(void *import)
974 {
975         class_import_get(import);
976 }
977
978 struct obd_import *class_import_get(struct obd_import *import)
979 {
980         cfs_atomic_inc(&import->imp_refcount);
981         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
982                cfs_atomic_read(&import->imp_refcount),
983                import->imp_obd->obd_name);
984         return import;
985 }
986 EXPORT_SYMBOL(class_import_get);
987
988 void class_import_put(struct obd_import *imp)
989 {
990         ENTRY;
991
992         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
993         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
994
995         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
996                cfs_atomic_read(&imp->imp_refcount) - 1,
997                imp->imp_obd->obd_name);
998
999         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1000                 CDEBUG(D_INFO, "final put import %p\n", imp);
1001                 obd_zombie_import_add(imp);
1002         }
1003
1004         EXIT;
1005 }
1006 EXPORT_SYMBOL(class_import_put);
1007
1008 static void init_imp_at(struct imp_at *at) {
1009         int i;
1010         at_init(&at->iat_net_latency, 0, 0);
1011         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1012                 /* max service estimates are tracked on the server side, so
1013                    don't use the AT history here, just use the last reported
1014                    val. (But keep hist for proc histogram, worst_ever) */
1015                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1016                         AT_FLG_NOHIST);
1017         }
1018 }
1019
1020 struct obd_import *class_new_import(struct obd_device *obd)
1021 {
1022         struct obd_import *imp;
1023
1024         OBD_ALLOC(imp, sizeof(*imp));
1025         if (imp == NULL)
1026                 return NULL;
1027
1028         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1029         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1030         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1031         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1032         cfs_spin_lock_init(&imp->imp_lock);
1033         imp->imp_last_success_conn = 0;
1034         imp->imp_state = LUSTRE_IMP_NEW;
1035         imp->imp_obd = class_incref(obd, "import", imp);
1036         cfs_mutex_init(&imp->imp_sec_mutex);
1037         cfs_waitq_init(&imp->imp_recovery_waitq);
1038
1039         cfs_atomic_set(&imp->imp_refcount, 2);
1040         cfs_atomic_set(&imp->imp_unregistering, 0);
1041         cfs_atomic_set(&imp->imp_inflight, 0);
1042         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1043         cfs_atomic_set(&imp->imp_inval_count, 0);
1044         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1045         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1046         class_handle_hash(&imp->imp_handle, import_handle_addref);
1047         init_imp_at(&imp->imp_at);
1048
1049         /* the default magic is V2, will be used in connect RPC, and
1050          * then adjusted according to the flags in request/reply. */
1051         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1052
1053         return imp;
1054 }
1055 EXPORT_SYMBOL(class_new_import);
1056
1057 void class_destroy_import(struct obd_import *import)
1058 {
1059         LASSERT(import != NULL);
1060         LASSERT(import != LP_POISON);
1061
1062         class_handle_unhash(&import->imp_handle);
1063
1064         cfs_spin_lock(&import->imp_lock);
1065         import->imp_generation++;
1066         cfs_spin_unlock(&import->imp_lock);
1067         class_import_put(import);
1068 }
1069 EXPORT_SYMBOL(class_destroy_import);
1070
1071 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1072
1073 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1074 {
1075         cfs_spin_lock(&exp->exp_locks_list_guard);
1076
1077         LASSERT(lock->l_exp_refs_nr >= 0);
1078
1079         if (lock->l_exp_refs_target != NULL &&
1080             lock->l_exp_refs_target != exp) {
1081                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1082                               exp, lock, lock->l_exp_refs_target);
1083         }
1084         if ((lock->l_exp_refs_nr ++) == 0) {
1085                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1086                 lock->l_exp_refs_target = exp;
1087         }
1088         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1089                lock, exp, lock->l_exp_refs_nr);
1090         cfs_spin_unlock(&exp->exp_locks_list_guard);
1091 }
1092 EXPORT_SYMBOL(__class_export_add_lock_ref);
1093
1094 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1095 {
1096         cfs_spin_lock(&exp->exp_locks_list_guard);
1097         LASSERT(lock->l_exp_refs_nr > 0);
1098         if (lock->l_exp_refs_target != exp) {
1099                 LCONSOLE_WARN("lock %p, "
1100                               "mismatching export pointers: %p, %p\n",
1101                               lock, lock->l_exp_refs_target, exp);
1102         }
1103         if (-- lock->l_exp_refs_nr == 0) {
1104                 cfs_list_del_init(&lock->l_exp_refs_link);
1105                 lock->l_exp_refs_target = NULL;
1106         }
1107         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1108                lock, exp, lock->l_exp_refs_nr);
1109         cfs_spin_unlock(&exp->exp_locks_list_guard);
1110 }
1111 EXPORT_SYMBOL(__class_export_del_lock_ref);
1112 #endif
1113
1114 /* A connection defines an export context in which preallocation can
1115    be managed. This releases the export pointer reference, and returns
1116    the export handle, so the export refcount is 1 when this function
1117    returns. */
1118 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1119                   struct obd_uuid *cluuid)
1120 {
1121         struct obd_export *export;
1122         LASSERT(conn != NULL);
1123         LASSERT(obd != NULL);
1124         LASSERT(cluuid != NULL);
1125         ENTRY;
1126
1127         export = class_new_export(obd, cluuid);
1128         if (IS_ERR(export))
1129                 RETURN(PTR_ERR(export));
1130
1131         conn->cookie = export->exp_handle.h_cookie;
1132         class_export_put(export);
1133
1134         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1135                cluuid->uuid, conn->cookie);
1136         RETURN(0);
1137 }
1138 EXPORT_SYMBOL(class_connect);
1139
1140
1141 /* This function removes 1-3 references from the export:
1142  * 1 - for export pointer passed
1143  * and if disconnect really need
1144  * 2 - removing from hash
1145  * 3 - in client_unlink_export
1146  * The export pointer passed to this function can destroyed */
1147 int class_disconnect(struct obd_export *export)
1148 {
1149         int already_disconnected;
1150         ENTRY;
1151
1152         if (export == NULL) {
1153                 CWARN("attempting to free NULL export %p\n", export);
1154                 RETURN(-EINVAL);
1155         }
1156
1157         cfs_spin_lock(&export->exp_lock);
1158         already_disconnected = export->exp_disconnected;
1159         export->exp_disconnected = 1;
1160         cfs_spin_unlock(&export->exp_lock);
1161
1162         /* class_cleanup(), abort_recovery(), and class_fail_export()
1163          * all end up in here, and if any of them race we shouldn't
1164          * call extra class_export_puts(). */
1165         if (already_disconnected) {
1166                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1167                 GOTO(no_disconn, already_disconnected);
1168         }
1169
1170         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1171                export->exp_handle.h_cookie);
1172
1173         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1174                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1175                              &export->exp_connection->c_peer.nid,
1176                              &export->exp_nid_hash);
1177
1178         class_unlink_export(export);
1179 no_disconn:
1180         class_export_put(export);
1181         RETURN(0);
1182 }
1183 EXPORT_SYMBOL(class_disconnect);
1184
1185 /* Return non-zero for a fully connected export */
1186 int class_connected_export(struct obd_export *exp)
1187 {
1188         if (exp) {
1189                 int connected;
1190                 cfs_spin_lock(&exp->exp_lock);
1191                 connected = (exp->exp_conn_cnt > 0);
1192                 cfs_spin_unlock(&exp->exp_lock);
1193                 return connected;
1194         }
1195         return 0;
1196 }
1197 EXPORT_SYMBOL(class_connected_export);
1198
1199 static void class_disconnect_export_list(cfs_list_t *list,
1200                                          enum obd_option flags)
1201 {
1202         int rc;
1203         struct obd_export *exp;
1204         ENTRY;
1205
1206         /* It's possible that an export may disconnect itself, but
1207          * nothing else will be added to this list. */
1208         while (!cfs_list_empty(list)) {
1209                 exp = cfs_list_entry(list->next, struct obd_export,
1210                                      exp_obd_chain);
1211                 /* need for safe call CDEBUG after obd_disconnect */
1212                 class_export_get(exp);
1213
1214                 cfs_spin_lock(&exp->exp_lock);
1215                 exp->exp_flags = flags;
1216                 cfs_spin_unlock(&exp->exp_lock);
1217
1218                 if (obd_uuid_equals(&exp->exp_client_uuid,
1219                                     &exp->exp_obd->obd_uuid)) {
1220                         CDEBUG(D_HA,
1221                                "exp %p export uuid == obd uuid, don't discon\n",
1222                                exp);
1223                         /* Need to delete this now so we don't end up pointing
1224                          * to work_list later when this export is cleaned up. */
1225                         cfs_list_del_init(&exp->exp_obd_chain);
1226                         class_export_put(exp);
1227                         continue;
1228                 }
1229
1230                 class_export_get(exp);
1231                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1232                        "last request at "CFS_TIME_T"\n",
1233                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1234                        exp, exp->exp_last_request_time);
1235                 /* release one export reference anyway */
1236                 rc = obd_disconnect(exp);
1237
1238                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1239                        obd_export_nid2str(exp), exp, rc);
1240                 class_export_put(exp);
1241         }
1242         EXIT;
1243 }
1244
1245 void class_disconnect_exports(struct obd_device *obd)
1246 {
1247         cfs_list_t work_list;
1248         ENTRY;
1249
1250         /* Move all of the exports from obd_exports to a work list, en masse. */
1251         CFS_INIT_LIST_HEAD(&work_list);
1252         cfs_spin_lock(&obd->obd_dev_lock);
1253         cfs_list_splice_init(&obd->obd_exports, &work_list);
1254         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1255         cfs_spin_unlock(&obd->obd_dev_lock);
1256
1257         if (!cfs_list_empty(&work_list)) {
1258                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1259                        "disconnecting them\n", obd->obd_minor, obd);
1260                 class_disconnect_export_list(&work_list,
1261                                              exp_flags_from_obd(obd));
1262         } else
1263                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1264                        obd->obd_minor, obd);
1265         EXIT;
1266 }
1267 EXPORT_SYMBOL(class_disconnect_exports);
1268
1269 /* Remove exports that have not completed recovery.
1270  */
1271 void class_disconnect_stale_exports(struct obd_device *obd,
1272                                     int (*test_export)(struct obd_export *))
1273 {
1274         cfs_list_t work_list;
1275         cfs_list_t *pos, *n;
1276         struct obd_export *exp;
1277         int evicted = 0;
1278         ENTRY;
1279
1280         CFS_INIT_LIST_HEAD(&work_list);
1281         cfs_spin_lock(&obd->obd_dev_lock);
1282         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1283                 int failed;
1284
1285                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1286
1287                 /* don't count self-export as client */
1288                 if (obd_uuid_equals(&exp->exp_client_uuid,
1289                                     &exp->exp_obd->obd_uuid))
1290                         continue;
1291
1292                 if (test_export(exp))
1293                         continue;
1294
1295                 cfs_spin_lock(&exp->exp_lock);
1296                 failed = exp->exp_failed;
1297                 exp->exp_failed = 1;
1298                 cfs_spin_unlock(&exp->exp_lock);
1299                 if (failed)
1300                         continue;
1301
1302                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1303                 evicted++;
1304                 CDEBUG(D_ERROR, "%s: disconnect stale client %s@%s\n",
1305                        obd->obd_name, exp->exp_client_uuid.uuid,
1306                        exp->exp_connection == NULL ? "<unknown>" :
1307                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1308                 print_export_data(exp, "EVICTING", 0);
1309         }
1310         cfs_spin_unlock(&obd->obd_dev_lock);
1311
1312         if (evicted) {
1313                 CDEBUG(D_HA, "%s: disconnecting %d stale clients\n",
1314                        obd->obd_name, evicted);
1315                 obd->obd_stale_clients += evicted;
1316         }
1317         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1318                                                  OBD_OPT_ABORT_RECOV);
1319         EXIT;
1320 }
1321 EXPORT_SYMBOL(class_disconnect_stale_exports);
1322
1323 void class_fail_export(struct obd_export *exp)
1324 {
1325         int rc, already_failed;
1326
1327         cfs_spin_lock(&exp->exp_lock);
1328         already_failed = exp->exp_failed;
1329         exp->exp_failed = 1;
1330         cfs_spin_unlock(&exp->exp_lock);
1331
1332         if (already_failed) {
1333                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1334                        exp, exp->exp_client_uuid.uuid);
1335                 return;
1336         }
1337
1338         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1339                exp, exp->exp_client_uuid.uuid);
1340
1341         if (obd_dump_on_timeout)
1342                 libcfs_debug_dumplog();
1343
1344         /* need for safe call CDEBUG after obd_disconnect */
1345         class_export_get(exp);
1346
1347         /* Most callers into obd_disconnect are removing their own reference
1348          * (request, for example) in addition to the one from the hash table.
1349          * We don't have such a reference here, so make one. */
1350         class_export_get(exp);
1351         rc = obd_disconnect(exp);
1352         if (rc)
1353                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1354         else
1355                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1356                        exp, exp->exp_client_uuid.uuid);
1357         class_export_put(exp);
1358 }
1359 EXPORT_SYMBOL(class_fail_export);
1360
1361 char *obd_export_nid2str(struct obd_export *exp)
1362 {
1363         if (exp->exp_connection != NULL)
1364                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1365
1366         return "(no nid)";
1367 }
1368 EXPORT_SYMBOL(obd_export_nid2str);
1369
1370 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1371 {
1372         struct obd_export *doomed_exp = NULL;
1373         int exports_evicted = 0;
1374
1375         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1376
1377         do {
1378                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1379                 if (doomed_exp == NULL)
1380                         break;
1381
1382                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1383                          "nid %s found, wanted nid %s, requested nid %s\n",
1384                          obd_export_nid2str(doomed_exp),
1385                          libcfs_nid2str(nid_key), nid);
1386                 LASSERTF(doomed_exp != obd->obd_self_export,
1387                          "self-export is hashed by NID?\n");
1388                 exports_evicted++;
1389                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1390                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1391                        exports_evicted);
1392                 class_fail_export(doomed_exp);
1393                 class_export_put(doomed_exp);
1394         } while (1);
1395
1396         if (!exports_evicted)
1397                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1398                        obd->obd_name, nid);
1399         return exports_evicted;
1400 }
1401 EXPORT_SYMBOL(obd_export_evict_by_nid);
1402
1403 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1404 {
1405         struct obd_export *doomed_exp = NULL;
1406         struct obd_uuid doomed_uuid;
1407         int exports_evicted = 0;
1408
1409         obd_str2uuid(&doomed_uuid, uuid);
1410         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1411                 CERROR("%s: can't evict myself\n", obd->obd_name);
1412                 return exports_evicted;
1413         }
1414
1415         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1416
1417         if (doomed_exp == NULL) {
1418                 CERROR("%s: can't disconnect %s: no exports found\n",
1419                        obd->obd_name, uuid);
1420         } else {
1421                 CWARN("%s: evicting %s at adminstrative request\n",
1422                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1423                 class_fail_export(doomed_exp);
1424                 class_export_put(doomed_exp);
1425                 exports_evicted++;
1426         }
1427
1428         return exports_evicted;
1429 }
1430 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1431
1432 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1433 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1434 EXPORT_SYMBOL(class_export_dump_hook);
1435 #endif
1436
1437 static void print_export_data(struct obd_export *exp, const char *status,
1438                               int locks)
1439 {
1440         struct ptlrpc_reply_state *rs;
1441         struct ptlrpc_reply_state *first_reply = NULL;
1442         int nreplies = 0;
1443
1444         cfs_spin_lock(&exp->exp_lock);
1445         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1446                                 rs_exp_list) {
1447                 if (nreplies == 0)
1448                         first_reply = rs;
1449                 nreplies++;
1450         }
1451         cfs_spin_unlock(&exp->exp_lock);
1452
1453         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1454                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1455                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1456                cfs_atomic_read(&exp->exp_rpc_count),
1457                cfs_atomic_read(&exp->exp_cb_count),
1458                cfs_atomic_read(&exp->exp_locks_count),
1459                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1460                nreplies, first_reply, nreplies > 3 ? "..." : "",
1461                exp->exp_last_committed);
1462 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1463         if (locks && class_export_dump_hook != NULL)
1464                 class_export_dump_hook(exp);
1465 #endif
1466 }
1467
1468 void dump_exports(struct obd_device *obd, int locks)
1469 {
1470         struct obd_export *exp;
1471
1472         cfs_spin_lock(&obd->obd_dev_lock);
1473         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1474                 print_export_data(exp, "ACTIVE", locks);
1475         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1476                 print_export_data(exp, "UNLINKED", locks);
1477         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1478                 print_export_data(exp, "DELAYED", locks);
1479         cfs_spin_unlock(&obd->obd_dev_lock);
1480         cfs_spin_lock(&obd_zombie_impexp_lock);
1481         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1482                 print_export_data(exp, "ZOMBIE", locks);
1483         cfs_spin_unlock(&obd_zombie_impexp_lock);
1484 }
1485 EXPORT_SYMBOL(dump_exports);
1486
1487 void obd_exports_barrier(struct obd_device *obd)
1488 {
1489         int waited = 2;
1490         LASSERT(cfs_list_empty(&obd->obd_exports));
1491         cfs_spin_lock(&obd->obd_dev_lock);
1492         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1493                 cfs_spin_unlock(&obd->obd_dev_lock);
1494                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1495                                                    cfs_time_seconds(waited));
1496                 if (waited > 5 && IS_PO2(waited)) {
1497                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1498                                       "more than %d seconds. "
1499                                       "The obd refcount = %d. Is it stuck?\n",
1500                                       obd->obd_name, waited,
1501                                       cfs_atomic_read(&obd->obd_refcount));
1502                         dump_exports(obd, 1);
1503                 }
1504                 waited *= 2;
1505                 cfs_spin_lock(&obd->obd_dev_lock);
1506         }
1507         cfs_spin_unlock(&obd->obd_dev_lock);
1508 }
1509 EXPORT_SYMBOL(obd_exports_barrier);
1510
1511 /* Total amount of zombies to be destroyed */
1512 static int zombies_count = 0;
1513
1514 /**
1515  * kill zombie imports and exports
1516  */
1517 void obd_zombie_impexp_cull(void)
1518 {
1519         struct obd_import *import;
1520         struct obd_export *export;
1521         ENTRY;
1522
1523         do {
1524                 cfs_spin_lock(&obd_zombie_impexp_lock);
1525
1526                 import = NULL;
1527                 if (!cfs_list_empty(&obd_zombie_imports)) {
1528                         import = cfs_list_entry(obd_zombie_imports.next,
1529                                                 struct obd_import,
1530                                                 imp_zombie_chain);
1531                         cfs_list_del_init(&import->imp_zombie_chain);
1532                 }
1533
1534                 export = NULL;
1535                 if (!cfs_list_empty(&obd_zombie_exports)) {
1536                         export = cfs_list_entry(obd_zombie_exports.next,
1537                                                 struct obd_export,
1538                                                 exp_obd_chain);
1539                         cfs_list_del_init(&export->exp_obd_chain);
1540                 }
1541
1542                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1543
1544                 if (import != NULL) {
1545                         class_import_destroy(import);
1546                         cfs_spin_lock(&obd_zombie_impexp_lock);
1547                         zombies_count--;
1548                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1549                 }
1550
1551                 if (export != NULL) {
1552                         class_export_destroy(export);
1553                         cfs_spin_lock(&obd_zombie_impexp_lock);
1554                         zombies_count--;
1555                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1556                 }
1557
1558                 cfs_cond_resched();
1559         } while (import != NULL || export != NULL);
1560         EXIT;
1561 }
1562
1563 static cfs_completion_t         obd_zombie_start;
1564 static cfs_completion_t         obd_zombie_stop;
1565 static unsigned long            obd_zombie_flags;
1566 static cfs_waitq_t              obd_zombie_waitq;
1567 static pid_t                    obd_zombie_pid;
1568
1569 enum {
1570         OBD_ZOMBIE_STOP   = 1 << 1
1571 };
1572
1573 /**
1574  * check for work for kill zombie import/export thread.
1575  */
1576 static int obd_zombie_impexp_check(void *arg)
1577 {
1578         int rc;
1579
1580         cfs_spin_lock(&obd_zombie_impexp_lock);
1581         rc = (zombies_count == 0) &&
1582              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1583         cfs_spin_unlock(&obd_zombie_impexp_lock);
1584
1585         RETURN(rc);
1586 }
1587
1588 /**
1589  * Add export to the obd_zombe thread and notify it.
1590  */
1591 static void obd_zombie_export_add(struct obd_export *exp) {
1592         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1593         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1594         cfs_list_del_init(&exp->exp_obd_chain);
1595         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1596         cfs_spin_lock(&obd_zombie_impexp_lock);
1597         zombies_count++;
1598         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1599         cfs_spin_unlock(&obd_zombie_impexp_lock);
1600
1601         obd_zombie_impexp_notify();
1602 }
1603
1604 /**
1605  * Add import to the obd_zombe thread and notify it.
1606  */
1607 static void obd_zombie_import_add(struct obd_import *imp) {
1608         LASSERT(imp->imp_sec == NULL);
1609         LASSERT(imp->imp_rq_pool == NULL);
1610         cfs_spin_lock(&obd_zombie_impexp_lock);
1611         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1612         zombies_count++;
1613         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1614         cfs_spin_unlock(&obd_zombie_impexp_lock);
1615
1616         obd_zombie_impexp_notify();
1617 }
1618
1619 /**
1620  * notify import/export destroy thread about new zombie.
1621  */
1622 static void obd_zombie_impexp_notify(void)
1623 {
1624         /*
1625          * Make sure obd_zomebie_impexp_thread get this notification.
1626          * It is possible this signal only get by obd_zombie_barrier, and
1627          * barrier gulps this notification and sleeps away and hangs ensues
1628          */
1629         cfs_waitq_broadcast(&obd_zombie_waitq);
1630 }
1631
1632 /**
1633  * check whether obd_zombie is idle
1634  */
1635 static int obd_zombie_is_idle(void)
1636 {
1637         int rc;
1638
1639         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1640         cfs_spin_lock(&obd_zombie_impexp_lock);
1641         rc = (zombies_count == 0);
1642         cfs_spin_unlock(&obd_zombie_impexp_lock);
1643         return rc;
1644 }
1645
1646 /**
1647  * wait when obd_zombie import/export queues become empty
1648  */
1649 void obd_zombie_barrier(void)
1650 {
1651         struct l_wait_info lwi = { 0 };
1652
1653         if (obd_zombie_pid == cfs_curproc_pid())
1654                 /* don't wait for myself */
1655                 return;
1656         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1657 }
1658 EXPORT_SYMBOL(obd_zombie_barrier);
1659
1660 #ifdef __KERNEL__
1661
1662 /**
1663  * destroy zombie export/import thread.
1664  */
1665 static int obd_zombie_impexp_thread(void *unused)
1666 {
1667         int rc;
1668
1669         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1670                 cfs_complete(&obd_zombie_start);
1671                 RETURN(rc);
1672         }
1673
1674         cfs_complete(&obd_zombie_start);
1675
1676         obd_zombie_pid = cfs_curproc_pid();
1677
1678         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1679                 struct l_wait_info lwi = { 0 };
1680
1681                 l_wait_event(obd_zombie_waitq,
1682                              !obd_zombie_impexp_check(NULL), &lwi);
1683                 obd_zombie_impexp_cull();
1684
1685                 /*
1686                  * Notify obd_zombie_barrier callers that queues
1687                  * may be empty.
1688                  */
1689                 cfs_waitq_signal(&obd_zombie_waitq);
1690         }
1691
1692         cfs_complete(&obd_zombie_stop);
1693
1694         RETURN(0);
1695 }
1696
1697 #else /* ! KERNEL */
1698
1699 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1700 static void *obd_zombie_impexp_work_cb;
1701 static void *obd_zombie_impexp_idle_cb;
1702
1703 int obd_zombie_impexp_kill(void *arg)
1704 {
1705         int rc = 0;
1706
1707         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1708                 obd_zombie_impexp_cull();
1709                 rc = 1;
1710         }
1711         cfs_atomic_dec(&zombie_recur);
1712         return rc;
1713 }
1714
1715 #endif
1716
1717 /**
1718  * start destroy zombie import/export thread
1719  */
1720 int obd_zombie_impexp_init(void)
1721 {
1722         int rc;
1723
1724         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1725         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1726         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1727         cfs_init_completion(&obd_zombie_start);
1728         cfs_init_completion(&obd_zombie_stop);
1729         cfs_waitq_init(&obd_zombie_waitq);
1730         obd_zombie_pid = 0;
1731
1732 #ifdef __KERNEL__
1733         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1734         if (rc < 0)
1735                 RETURN(rc);
1736
1737         cfs_wait_for_completion(&obd_zombie_start);
1738 #else
1739
1740         obd_zombie_impexp_work_cb =
1741                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1742                                                  &obd_zombie_impexp_kill, NULL);
1743
1744         obd_zombie_impexp_idle_cb =
1745                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1746                                                  &obd_zombie_impexp_check, NULL);
1747         rc = 0;
1748 #endif
1749         RETURN(rc);
1750 }
1751 /**
1752  * stop destroy zombie import/export thread
1753  */
1754 void obd_zombie_impexp_stop(void)
1755 {
1756         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1757         obd_zombie_impexp_notify();
1758 #ifdef __KERNEL__
1759         cfs_wait_for_completion(&obd_zombie_stop);
1760 #else
1761         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1762         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1763 #endif
1764 }
1765
1766 /***** Kernel-userspace comm helpers *******/
1767
1768 /* Get length of entire message, including header */
1769 int kuc_len(int payload_len)
1770 {
1771         return sizeof(struct kuc_hdr) + payload_len;
1772 }
1773 EXPORT_SYMBOL(kuc_len);
1774
1775 /* Get a pointer to kuc header, given a ptr to the payload
1776  * @param p Pointer to payload area
1777  * @returns Pointer to kuc header
1778  */
1779 struct kuc_hdr * kuc_ptr(void *p)
1780 {
1781         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1782         LASSERT(lh->kuc_magic == KUC_MAGIC);
1783         return lh;
1784 }
1785 EXPORT_SYMBOL(kuc_ptr);
1786
1787 /* Test if payload is part of kuc message
1788  * @param p Pointer to payload area
1789  * @returns boolean
1790  */
1791 int kuc_ispayload(void *p)
1792 {
1793         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1794
1795         if (kh->kuc_magic == KUC_MAGIC)
1796                 return 1;
1797         else
1798                 return 0;
1799 }
1800 EXPORT_SYMBOL(kuc_ispayload);
1801
1802 /* Alloc space for a message, and fill in header
1803  * @return Pointer to payload area
1804  */
1805 void *kuc_alloc(int payload_len, int transport, int type)
1806 {
1807         struct kuc_hdr *lh;
1808         int len = kuc_len(payload_len);
1809
1810         OBD_ALLOC(lh, len);
1811         if (lh == NULL)
1812                 return ERR_PTR(-ENOMEM);
1813
1814         lh->kuc_magic = KUC_MAGIC;
1815         lh->kuc_transport = transport;
1816         lh->kuc_msgtype = type;
1817         lh->kuc_msglen = len;
1818
1819         return (void *)(lh + 1);
1820 }
1821 EXPORT_SYMBOL(kuc_alloc);
1822
1823 /* Takes pointer to payload area */
1824 inline void kuc_free(void *p, int payload_len)
1825 {
1826         struct kuc_hdr *lh = kuc_ptr(p);
1827         OBD_FREE(lh, kuc_len(payload_len));
1828 }
1829 EXPORT_SYMBOL(kuc_free);
1830
1831
1832