Whamcloud - gitweb
LU-1448 llite: Prevent NULL pointer dereference on disabled OSC
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 cfs_spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68
69 /*
70  * support functions: we could use inter-module communication, but this
71  * is more portable to other OS's
72  */
73 static struct obd_device *obd_device_alloc(void)
74 {
75         struct obd_device *obd;
76
77         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
78         if (obd != NULL) {
79                 obd->obd_magic = OBD_DEVICE_MAGIC;
80         }
81         return obd;
82 }
83
84 static void obd_device_free(struct obd_device *obd)
85 {
86         LASSERT(obd != NULL);
87         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
88                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
89         if (obd->obd_namespace != NULL) {
90                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
91                        obd, obd->obd_namespace, obd->obd_force);
92                 LBUG();
93         }
94         lu_ref_fini(&obd->obd_reference);
95         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
96 }
97
98 struct obd_type *class_search_type(const char *name)
99 {
100         cfs_list_t *tmp;
101         struct obd_type *type;
102
103         cfs_spin_lock(&obd_types_lock);
104         cfs_list_for_each(tmp, &obd_types) {
105                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
106                 if (strcmp(type->typ_name, name) == 0) {
107                         cfs_spin_unlock(&obd_types_lock);
108                         return type;
109                 }
110         }
111         cfs_spin_unlock(&obd_types_lock);
112         return NULL;
113 }
114
115 struct obd_type *class_get_type(const char *name)
116 {
117         struct obd_type *type = class_search_type(name);
118
119 #ifdef HAVE_MODULE_LOADING_SUPPORT
120         if (!type) {
121                 const char *modname = name;
122                 if (!cfs_request_module("%s", modname)) {
123                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
124                         type = class_search_type(name);
125                 } else {
126                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
127                                            modname);
128                 }
129         }
130 #endif
131         if (type) {
132                 cfs_spin_lock(&type->obd_type_lock);
133                 type->typ_refcnt++;
134                 cfs_try_module_get(type->typ_dt_ops->o_owner);
135                 cfs_spin_unlock(&type->obd_type_lock);
136         }
137         return type;
138 }
139 EXPORT_SYMBOL(class_get_type);
140
141 void class_put_type(struct obd_type *type)
142 {
143         LASSERT(type);
144         cfs_spin_lock(&type->obd_type_lock);
145         type->typ_refcnt--;
146         cfs_module_put(type->typ_dt_ops->o_owner);
147         cfs_spin_unlock(&type->obd_type_lock);
148 }
149 EXPORT_SYMBOL(class_put_type);
150
151 #define CLASS_MAX_NAME 1024
152
153 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
154                         struct lprocfs_vars *vars, const char *name,
155                         struct lu_device_type *ldt)
156 {
157         struct obd_type *type;
158         int rc = 0;
159         ENTRY;
160
161         /* sanity check */
162         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
163
164         if (class_search_type(name)) {
165                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
166                 RETURN(-EEXIST);
167         }
168
169         rc = -ENOMEM;
170         OBD_ALLOC(type, sizeof(*type));
171         if (type == NULL)
172                 RETURN(rc);
173
174         OBD_ALLOC_PTR(type->typ_dt_ops);
175         OBD_ALLOC_PTR(type->typ_md_ops);
176         OBD_ALLOC(type->typ_name, strlen(name) + 1);
177
178         if (type->typ_dt_ops == NULL ||
179             type->typ_md_ops == NULL ||
180             type->typ_name == NULL)
181                 GOTO (failed, rc);
182
183         *(type->typ_dt_ops) = *dt_ops;
184         /* md_ops is optional */
185         if (md_ops)
186                 *(type->typ_md_ops) = *md_ops;
187         strcpy(type->typ_name, name);
188         cfs_spin_lock_init(&type->obd_type_lock);
189
190 #ifdef LPROCFS
191         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
192                                               vars, type);
193         if (IS_ERR(type->typ_procroot)) {
194                 rc = PTR_ERR(type->typ_procroot);
195                 type->typ_procroot = NULL;
196                 GOTO (failed, rc);
197         }
198 #endif
199         if (ldt != NULL) {
200                 type->typ_lu = ldt;
201                 rc = lu_device_type_init(ldt);
202                 if (rc != 0)
203                         GOTO (failed, rc);
204         }
205
206         cfs_spin_lock(&obd_types_lock);
207         cfs_list_add(&type->typ_chain, &obd_types);
208         cfs_spin_unlock(&obd_types_lock);
209
210         RETURN (0);
211
212  failed:
213         if (type->typ_name != NULL)
214                 OBD_FREE(type->typ_name, strlen(name) + 1);
215         if (type->typ_md_ops != NULL)
216                 OBD_FREE_PTR(type->typ_md_ops);
217         if (type->typ_dt_ops != NULL)
218                 OBD_FREE_PTR(type->typ_dt_ops);
219         OBD_FREE(type, sizeof(*type));
220         RETURN(rc);
221 }
222 EXPORT_SYMBOL(class_register_type);
223
224 int class_unregister_type(const char *name)
225 {
226         struct obd_type *type = class_search_type(name);
227         ENTRY;
228
229         if (!type) {
230                 CERROR("unknown obd type\n");
231                 RETURN(-EINVAL);
232         }
233
234         if (type->typ_refcnt) {
235                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
236                 /* This is a bad situation, let's make the best of it */
237                 /* Remove ops, but leave the name for debugging */
238                 OBD_FREE_PTR(type->typ_dt_ops);
239                 OBD_FREE_PTR(type->typ_md_ops);
240                 RETURN(-EBUSY);
241         }
242
243         if (type->typ_procroot) {
244                 lprocfs_remove(&type->typ_procroot);
245         }
246
247         if (type->typ_lu)
248                 lu_device_type_fini(type->typ_lu);
249
250         cfs_spin_lock(&obd_types_lock);
251         cfs_list_del(&type->typ_chain);
252         cfs_spin_unlock(&obd_types_lock);
253         OBD_FREE(type->typ_name, strlen(name) + 1);
254         if (type->typ_dt_ops != NULL)
255                 OBD_FREE_PTR(type->typ_dt_ops);
256         if (type->typ_md_ops != NULL)
257                 OBD_FREE_PTR(type->typ_md_ops);
258         OBD_FREE(type, sizeof(*type));
259         RETURN(0);
260 } /* class_unregister_type */
261 EXPORT_SYMBOL(class_unregister_type);
262
263 /**
264  * Create a new obd device.
265  *
266  * Find an empty slot in ::obd_devs[], create a new obd device in it.
267  *
268  * \param[in] type_name obd device type string.
269  * \param[in] name      obd device name.
270  *
271  * \retval NULL if create fails, otherwise return the obd device
272  *         pointer created.
273  */
274 struct obd_device *class_newdev(const char *type_name, const char *name)
275 {
276         struct obd_device *result = NULL;
277         struct obd_device *newdev;
278         struct obd_type *type = NULL;
279         int i;
280         int new_obd_minor = 0;
281         ENTRY;
282
283         if (strlen(name) >= MAX_OBD_NAME) {
284                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
285                 RETURN(ERR_PTR(-EINVAL));
286         }
287
288         type = class_get_type(type_name);
289         if (type == NULL){
290                 CERROR("OBD: unknown type: %s\n", type_name);
291                 RETURN(ERR_PTR(-ENODEV));
292         }
293
294         newdev = obd_device_alloc();
295         if (newdev == NULL) {
296                 class_put_type(type);
297                 RETURN(ERR_PTR(-ENOMEM));
298         }
299         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
300
301         cfs_write_lock(&obd_dev_lock);
302         for (i = 0; i < class_devno_max(); i++) {
303                 struct obd_device *obd = class_num2obd(i);
304
305                 if (obd && obd->obd_name &&
306                     (strcmp(name, obd->obd_name) == 0)) {
307                         CERROR("Device %s already exists at %d, won't add\n",
308                                name, i);
309                         if (result) {
310                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
311                                          "%p obd_magic %08x != %08x\n", result,
312                                          result->obd_magic, OBD_DEVICE_MAGIC);
313                                 LASSERTF(result->obd_minor == new_obd_minor,
314                                          "%p obd_minor %d != %d\n", result,
315                                          result->obd_minor, new_obd_minor);
316
317                                 obd_devs[result->obd_minor] = NULL;
318                                 result->obd_name[0]='\0';
319                          }
320                         result = ERR_PTR(-EEXIST);
321                         break;
322                 }
323                 if (!result && !obd) {
324                         result = newdev;
325                         result->obd_minor = i;
326                         new_obd_minor = i;
327                         result->obd_type = type;
328                         strncpy(result->obd_name, name,
329                                 sizeof(result->obd_name) - 1);
330                         obd_devs[i] = result;
331                 }
332         }
333         cfs_write_unlock(&obd_dev_lock);
334
335         if (result == NULL && i >= class_devno_max()) {
336                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
337                        class_devno_max());
338                 RETURN(ERR_PTR(-EOVERFLOW));
339         }
340
341         if (IS_ERR(result)) {
342                 obd_device_free(newdev);
343                 class_put_type(type);
344         } else {
345                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
346                        result->obd_name, result);
347         }
348         RETURN(result);
349 }
350
351 void class_release_dev(struct obd_device *obd)
352 {
353         struct obd_type *obd_type = obd->obd_type;
354
355         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
356                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
357         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
358                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
359         LASSERT(obd_type != NULL);
360
361         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
362                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
363
364         cfs_write_lock(&obd_dev_lock);
365         obd_devs[obd->obd_minor] = NULL;
366         cfs_write_unlock(&obd_dev_lock);
367         obd_device_free(obd);
368
369         class_put_type(obd_type);
370 }
371
372 int class_name2dev(const char *name)
373 {
374         int i;
375
376         if (!name)
377                 return -1;
378
379         cfs_read_lock(&obd_dev_lock);
380         for (i = 0; i < class_devno_max(); i++) {
381                 struct obd_device *obd = class_num2obd(i);
382
383                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
384                         /* Make sure we finished attaching before we give
385                            out any references */
386                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
387                         if (obd->obd_attached) {
388                                 cfs_read_unlock(&obd_dev_lock);
389                                 return i;
390                         }
391                         break;
392                 }
393         }
394         cfs_read_unlock(&obd_dev_lock);
395
396         return -1;
397 }
398 EXPORT_SYMBOL(class_name2dev);
399
400 struct obd_device *class_name2obd(const char *name)
401 {
402         int dev = class_name2dev(name);
403
404         if (dev < 0 || dev > class_devno_max())
405                 return NULL;
406         return class_num2obd(dev);
407 }
408 EXPORT_SYMBOL(class_name2obd);
409
410 int class_uuid2dev(struct obd_uuid *uuid)
411 {
412         int i;
413
414         cfs_read_lock(&obd_dev_lock);
415         for (i = 0; i < class_devno_max(); i++) {
416                 struct obd_device *obd = class_num2obd(i);
417
418                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
419                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
420                         cfs_read_unlock(&obd_dev_lock);
421                         return i;
422                 }
423         }
424         cfs_read_unlock(&obd_dev_lock);
425
426         return -1;
427 }
428 EXPORT_SYMBOL(class_uuid2dev);
429
430 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
431 {
432         int dev = class_uuid2dev(uuid);
433         if (dev < 0)
434                 return NULL;
435         return class_num2obd(dev);
436 }
437 EXPORT_SYMBOL(class_uuid2obd);
438
439 /**
440  * Get obd device from ::obd_devs[]
441  *
442  * \param num [in] array index
443  *
444  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
445  *         otherwise return the obd device there.
446  */
447 struct obd_device *class_num2obd(int num)
448 {
449         struct obd_device *obd = NULL;
450
451         if (num < class_devno_max()) {
452                 obd = obd_devs[num];
453                 if (obd == NULL)
454                         return NULL;
455
456                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
457                          "%p obd_magic %08x != %08x\n",
458                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
459                 LASSERTF(obd->obd_minor == num,
460                          "%p obd_minor %0d != %0d\n",
461                          obd, obd->obd_minor, num);
462         }
463
464         return obd;
465 }
466 EXPORT_SYMBOL(class_num2obd);
467
468 void class_obd_list(void)
469 {
470         char *status;
471         int i;
472
473         cfs_read_lock(&obd_dev_lock);
474         for (i = 0; i < class_devno_max(); i++) {
475                 struct obd_device *obd = class_num2obd(i);
476
477                 if (obd == NULL)
478                         continue;
479                 if (obd->obd_stopping)
480                         status = "ST";
481                 else if (obd->obd_set_up)
482                         status = "UP";
483                 else if (obd->obd_attached)
484                         status = "AT";
485                 else
486                         status = "--";
487                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
488                          i, status, obd->obd_type->typ_name,
489                          obd->obd_name, obd->obd_uuid.uuid,
490                          cfs_atomic_read(&obd->obd_refcount));
491         }
492         cfs_read_unlock(&obd_dev_lock);
493         return;
494 }
495
496 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
497    specified, then only the client with that uuid is returned,
498    otherwise any client connected to the tgt is returned. */
499 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
500                                           const char * typ_name,
501                                           struct obd_uuid *grp_uuid)
502 {
503         int i;
504
505         cfs_read_lock(&obd_dev_lock);
506         for (i = 0; i < class_devno_max(); i++) {
507                 struct obd_device *obd = class_num2obd(i);
508
509                 if (obd == NULL)
510                         continue;
511                 if ((strncmp(obd->obd_type->typ_name, typ_name,
512                              strlen(typ_name)) == 0)) {
513                         if (obd_uuid_equals(tgt_uuid,
514                                             &obd->u.cli.cl_target_uuid) &&
515                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
516                                                          &obd->obd_uuid) : 1)) {
517                                 cfs_read_unlock(&obd_dev_lock);
518                                 return obd;
519                         }
520                 }
521         }
522         cfs_read_unlock(&obd_dev_lock);
523
524         return NULL;
525 }
526 EXPORT_SYMBOL(class_find_client_obd);
527
528 /* Iterate the obd_device list looking devices have grp_uuid. Start
529    searching at *next, and if a device is found, the next index to look
530    at is saved in *next. If next is NULL, then the first matching device
531    will always be returned. */
532 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
533 {
534         int i;
535
536         if (next == NULL)
537                 i = 0;
538         else if (*next >= 0 && *next < class_devno_max())
539                 i = *next;
540         else
541                 return NULL;
542
543         cfs_read_lock(&obd_dev_lock);
544         for (; i < class_devno_max(); i++) {
545                 struct obd_device *obd = class_num2obd(i);
546
547                 if (obd == NULL)
548                         continue;
549                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
550                         if (next != NULL)
551                                 *next = i+1;
552                         cfs_read_unlock(&obd_dev_lock);
553                         return obd;
554                 }
555         }
556         cfs_read_unlock(&obd_dev_lock);
557
558         return NULL;
559 }
560 EXPORT_SYMBOL(class_devices_in_group);
561
562 /**
563  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
564  * adjust sptlrpc settings accordingly.
565  */
566 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
567 {
568         struct obd_device  *obd;
569         const char         *type;
570         int                 i, rc = 0, rc2;
571
572         LASSERT(namelen > 0);
573
574         cfs_read_lock(&obd_dev_lock);
575         for (i = 0; i < class_devno_max(); i++) {
576                 obd = class_num2obd(i);
577
578                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
579                         continue;
580
581                 /* only notify mdc, osc, mdt, ost */
582                 type = obd->obd_type->typ_name;
583                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
584                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
585                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
586                     strcmp(type, LUSTRE_OST_NAME) != 0)
587                         continue;
588
589                 if (strncmp(obd->obd_name, fsname, namelen))
590                         continue;
591
592                 class_incref(obd, __FUNCTION__, obd);
593                 cfs_read_unlock(&obd_dev_lock);
594                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
595                                          sizeof(KEY_SPTLRPC_CONF),
596                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
597                 rc = rc ? rc : rc2;
598                 class_decref(obd, __FUNCTION__, obd);
599                 cfs_read_lock(&obd_dev_lock);
600         }
601         cfs_read_unlock(&obd_dev_lock);
602         return rc;
603 }
604 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
605
606 void obd_cleanup_caches(void)
607 {
608         int rc;
609
610         ENTRY;
611         if (obd_device_cachep) {
612                 rc = cfs_mem_cache_destroy(obd_device_cachep);
613                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
614                 obd_device_cachep = NULL;
615         }
616         if (obdo_cachep) {
617                 rc = cfs_mem_cache_destroy(obdo_cachep);
618                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
619                 obdo_cachep = NULL;
620         }
621         if (import_cachep) {
622                 rc = cfs_mem_cache_destroy(import_cachep);
623                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
624                 import_cachep = NULL;
625         }
626         if (capa_cachep) {
627                 rc = cfs_mem_cache_destroy(capa_cachep);
628                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
629                 capa_cachep = NULL;
630         }
631         EXIT;
632 }
633
634 int obd_init_caches(void)
635 {
636         ENTRY;
637
638         LASSERT(obd_device_cachep == NULL);
639         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
640                                                  sizeof(struct obd_device),
641                                                  0, 0);
642         if (!obd_device_cachep)
643                 GOTO(out, -ENOMEM);
644
645         LASSERT(obdo_cachep == NULL);
646         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
647                                            0, 0);
648         if (!obdo_cachep)
649                 GOTO(out, -ENOMEM);
650
651         LASSERT(import_cachep == NULL);
652         import_cachep = cfs_mem_cache_create("ll_import_cache",
653                                              sizeof(struct obd_import),
654                                              0, 0);
655         if (!import_cachep)
656                 GOTO(out, -ENOMEM);
657
658         LASSERT(capa_cachep == NULL);
659         capa_cachep = cfs_mem_cache_create("capa_cache",
660                                            sizeof(struct obd_capa), 0, 0);
661         if (!capa_cachep)
662                 GOTO(out, -ENOMEM);
663
664         RETURN(0);
665  out:
666         obd_cleanup_caches();
667         RETURN(-ENOMEM);
668
669 }
670
671 /* map connection to client */
672 struct obd_export *class_conn2export(struct lustre_handle *conn)
673 {
674         struct obd_export *export;
675         ENTRY;
676
677         if (!conn) {
678                 CDEBUG(D_CACHE, "looking for null handle\n");
679                 RETURN(NULL);
680         }
681
682         if (conn->cookie == -1) {  /* this means assign a new connection */
683                 CDEBUG(D_CACHE, "want a new connection\n");
684                 RETURN(NULL);
685         }
686
687         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
688         export = class_handle2object(conn->cookie);
689         RETURN(export);
690 }
691 EXPORT_SYMBOL(class_conn2export);
692
693 struct obd_device *class_exp2obd(struct obd_export *exp)
694 {
695         if (exp)
696                 return exp->exp_obd;
697         return NULL;
698 }
699 EXPORT_SYMBOL(class_exp2obd);
700
701 struct obd_device *class_conn2obd(struct lustre_handle *conn)
702 {
703         struct obd_export *export;
704         export = class_conn2export(conn);
705         if (export) {
706                 struct obd_device *obd = export->exp_obd;
707                 class_export_put(export);
708                 return obd;
709         }
710         return NULL;
711 }
712 EXPORT_SYMBOL(class_conn2obd);
713
714 struct obd_import *class_exp2cliimp(struct obd_export *exp)
715 {
716         struct obd_device *obd = exp->exp_obd;
717         if (obd == NULL)
718                 return NULL;
719         return obd->u.cli.cl_import;
720 }
721 EXPORT_SYMBOL(class_exp2cliimp);
722
723 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
724 {
725         struct obd_device *obd = class_conn2obd(conn);
726         if (obd == NULL)
727                 return NULL;
728         return obd->u.cli.cl_import;
729 }
730 EXPORT_SYMBOL(class_conn2cliimp);
731
732 /* Export management functions */
733
734 /* if export is involved in recovery then clean up related things */
735 void class_export_recovery_cleanup(struct obd_export *exp)
736 {
737         struct obd_device *obd = exp->exp_obd;
738
739         cfs_spin_lock(&obd->obd_recovery_task_lock);
740         if (exp->exp_delayed)
741                 obd->obd_delayed_clients--;
742         if (obd->obd_recovering && exp->exp_in_recovery) {
743                 cfs_spin_lock(&exp->exp_lock);
744                 exp->exp_in_recovery = 0;
745                 cfs_spin_unlock(&exp->exp_lock);
746                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
747                 cfs_atomic_dec(&obd->obd_connected_clients);
748         }
749         cfs_spin_unlock(&obd->obd_recovery_task_lock);
750         /** Cleanup req replay fields */
751         if (exp->exp_req_replay_needed) {
752                 cfs_spin_lock(&exp->exp_lock);
753                 exp->exp_req_replay_needed = 0;
754                 cfs_spin_unlock(&exp->exp_lock);
755                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
756                 cfs_atomic_dec(&obd->obd_req_replay_clients);
757         }
758         /** Cleanup lock replay data */
759         if (exp->exp_lock_replay_needed) {
760                 cfs_spin_lock(&exp->exp_lock);
761                 exp->exp_lock_replay_needed = 0;
762                 cfs_spin_unlock(&exp->exp_lock);
763                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
764                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
765         }
766 }
767
768 static void class_export_destroy(struct obd_export *exp)
769 {
770         struct obd_device *obd = exp->exp_obd;
771         ENTRY;
772
773         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
774         LASSERT(obd != NULL);
775
776         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
777                exp->exp_client_uuid.uuid, obd->obd_name);
778
779
780         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
781         if (exp->exp_connection)
782                 ptlrpc_put_connection_superhack(exp->exp_connection);
783
784         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
785         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
786         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
787         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
788         obd_destroy_export(exp);
789         class_decref(obd, "export", exp);
790
791         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
792         EXIT;
793 }
794
795 static void export_handle_addref(void *export)
796 {
797         class_export_get(export);
798 }
799
800 static struct portals_handle_ops export_handle_ops = {
801         .hop_addref = export_handle_addref,
802         .hop_free   = NULL,
803 };
804
805 struct obd_export *class_export_get(struct obd_export *exp)
806 {
807         cfs_atomic_inc(&exp->exp_refcount);
808         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
809                cfs_atomic_read(&exp->exp_refcount));
810         return exp;
811 }
812 EXPORT_SYMBOL(class_export_get);
813
814 void class_export_put(struct obd_export *exp)
815 {
816         LASSERT(exp != NULL);
817         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
818         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
819                cfs_atomic_read(&exp->exp_refcount) - 1);
820
821         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
822                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
823                 CDEBUG(D_IOCTL, "final put %p/%s\n",
824                        exp, exp->exp_client_uuid.uuid);
825
826                 /* release nid stat refererence */
827                 lprocfs_exp_cleanup(exp);
828                 class_export_recovery_cleanup(exp);
829
830                 obd_zombie_export_add(exp);
831         }
832 }
833 EXPORT_SYMBOL(class_export_put);
834
835 /* Creates a new export, adds it to the hash table, and returns a
836  * pointer to it. The refcount is 2: one for the hash reference, and
837  * one for the pointer returned by this function. */
838 struct obd_export *class_new_export(struct obd_device *obd,
839                                     struct obd_uuid *cluuid)
840 {
841         struct obd_export *export;
842         cfs_hash_t *hash = NULL;
843         int rc = 0;
844         ENTRY;
845
846         OBD_ALLOC_PTR(export);
847         if (!export)
848                 return ERR_PTR(-ENOMEM);
849
850         export->exp_conn_cnt = 0;
851         export->exp_lock_hash = NULL;
852         export->exp_flock_hash = NULL;
853         cfs_atomic_set(&export->exp_refcount, 2);
854         cfs_atomic_set(&export->exp_rpc_count, 0);
855         cfs_atomic_set(&export->exp_cb_count, 0);
856         cfs_atomic_set(&export->exp_locks_count, 0);
857 #if LUSTRE_TRACKS_LOCK_EXP_REFS
858         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
859         cfs_spin_lock_init(&export->exp_locks_list_guard);
860 #endif
861         cfs_atomic_set(&export->exp_replay_count, 0);
862         export->exp_obd = obd;
863         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
864         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
865         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
866         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
867         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
868         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
869         class_handle_hash(&export->exp_handle, &export_handle_ops);
870         export->exp_last_request_time = cfs_time_current_sec();
871         cfs_spin_lock_init(&export->exp_lock);
872         cfs_spin_lock_init(&export->exp_rpc_lock);
873         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
874         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
875         cfs_spin_lock_init(&export->exp_bl_list_lock);
876         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
877
878         export->exp_sp_peer = LUSTRE_SP_ANY;
879         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
880         export->exp_client_uuid = *cluuid;
881         obd_init_export(export);
882
883         cfs_spin_lock(&obd->obd_dev_lock);
884          /* shouldn't happen, but might race */
885         if (obd->obd_stopping)
886                 GOTO(exit_unlock, rc = -ENODEV);
887
888         hash = cfs_hash_getref(obd->obd_uuid_hash);
889         if (hash == NULL)
890                 GOTO(exit_unlock, rc = -ENODEV);
891         cfs_spin_unlock(&obd->obd_dev_lock);
892
893         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
894                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
895                 if (rc != 0) {
896                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
897                                       obd->obd_name, cluuid->uuid, rc);
898                         GOTO(exit_err, rc = -EALREADY);
899                 }
900         }
901
902         cfs_spin_lock(&obd->obd_dev_lock);
903         if (obd->obd_stopping) {
904                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
905                 GOTO(exit_unlock, rc = -ENODEV);
906         }
907
908         class_incref(obd, "export", export);
909         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
910         cfs_list_add_tail(&export->exp_obd_chain_timed,
911                           &export->exp_obd->obd_exports_timed);
912         export->exp_obd->obd_num_exports++;
913         cfs_spin_unlock(&obd->obd_dev_lock);
914         cfs_hash_putref(hash);
915         RETURN(export);
916
917 exit_unlock:
918         cfs_spin_unlock(&obd->obd_dev_lock);
919 exit_err:
920         if (hash)
921                 cfs_hash_putref(hash);
922         class_handle_unhash(&export->exp_handle);
923         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
924         obd_destroy_export(export);
925         OBD_FREE_PTR(export);
926         return ERR_PTR(rc);
927 }
928 EXPORT_SYMBOL(class_new_export);
929
930 void class_unlink_export(struct obd_export *exp)
931 {
932         class_handle_unhash(&exp->exp_handle);
933
934         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
935         /* delete an uuid-export hashitem from hashtables */
936         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
937                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
938                              &exp->exp_client_uuid,
939                              &exp->exp_uuid_hash);
940
941         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
942         cfs_list_del_init(&exp->exp_obd_chain_timed);
943         exp->exp_obd->obd_num_exports--;
944         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
945         class_export_put(exp);
946 }
947 EXPORT_SYMBOL(class_unlink_export);
948
949 /* Import management functions */
950 void class_import_destroy(struct obd_import *imp)
951 {
952         ENTRY;
953
954         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
955                 imp->imp_obd->obd_name);
956
957         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
958
959         ptlrpc_put_connection_superhack(imp->imp_connection);
960
961         while (!cfs_list_empty(&imp->imp_conn_list)) {
962                 struct obd_import_conn *imp_conn;
963
964                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
965                                           struct obd_import_conn, oic_item);
966                 cfs_list_del_init(&imp_conn->oic_item);
967                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
968                 OBD_FREE(imp_conn, sizeof(*imp_conn));
969         }
970
971         LASSERT(imp->imp_sec == NULL);
972         class_decref(imp->imp_obd, "import", imp);
973         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
974         EXIT;
975 }
976
977 static void import_handle_addref(void *import)
978 {
979         class_import_get(import);
980 }
981
982 static struct portals_handle_ops import_handle_ops = {
983         .hop_addref = import_handle_addref,
984         .hop_free   = NULL,
985 };
986
987 struct obd_import *class_import_get(struct obd_import *import)
988 {
989         cfs_atomic_inc(&import->imp_refcount);
990         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
991                cfs_atomic_read(&import->imp_refcount),
992                import->imp_obd->obd_name);
993         return import;
994 }
995 EXPORT_SYMBOL(class_import_get);
996
997 void class_import_put(struct obd_import *imp)
998 {
999         ENTRY;
1000
1001         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1002         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1003
1004         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1005                cfs_atomic_read(&imp->imp_refcount) - 1,
1006                imp->imp_obd->obd_name);
1007
1008         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
1009                 CDEBUG(D_INFO, "final put import %p\n", imp);
1010                 obd_zombie_import_add(imp);
1011         }
1012
1013         EXIT;
1014 }
1015 EXPORT_SYMBOL(class_import_put);
1016
1017 static void init_imp_at(struct imp_at *at) {
1018         int i;
1019         at_init(&at->iat_net_latency, 0, 0);
1020         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1021                 /* max service estimates are tracked on the server side, so
1022                    don't use the AT history here, just use the last reported
1023                    val. (But keep hist for proc histogram, worst_ever) */
1024                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1025                         AT_FLG_NOHIST);
1026         }
1027 }
1028
1029 struct obd_import *class_new_import(struct obd_device *obd)
1030 {
1031         struct obd_import *imp;
1032
1033         OBD_ALLOC(imp, sizeof(*imp));
1034         if (imp == NULL)
1035                 return NULL;
1036
1037         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1038         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1039         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1040         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1041         cfs_spin_lock_init(&imp->imp_lock);
1042         imp->imp_last_success_conn = 0;
1043         imp->imp_state = LUSTRE_IMP_NEW;
1044         imp->imp_obd = class_incref(obd, "import", imp);
1045         cfs_mutex_init(&imp->imp_sec_mutex);
1046         cfs_waitq_init(&imp->imp_recovery_waitq);
1047
1048         cfs_atomic_set(&imp->imp_refcount, 2);
1049         cfs_atomic_set(&imp->imp_unregistering, 0);
1050         cfs_atomic_set(&imp->imp_inflight, 0);
1051         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1052         cfs_atomic_set(&imp->imp_inval_count, 0);
1053         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1054         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1055         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1056         init_imp_at(&imp->imp_at);
1057
1058         /* the default magic is V2, will be used in connect RPC, and
1059          * then adjusted according to the flags in request/reply. */
1060         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1061
1062         return imp;
1063 }
1064 EXPORT_SYMBOL(class_new_import);
1065
1066 void class_destroy_import(struct obd_import *import)
1067 {
1068         LASSERT(import != NULL);
1069         LASSERT(import != LP_POISON);
1070
1071         class_handle_unhash(&import->imp_handle);
1072
1073         cfs_spin_lock(&import->imp_lock);
1074         import->imp_generation++;
1075         cfs_spin_unlock(&import->imp_lock);
1076         class_import_put(import);
1077 }
1078 EXPORT_SYMBOL(class_destroy_import);
1079
1080 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1081
1082 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1083 {
1084         cfs_spin_lock(&exp->exp_locks_list_guard);
1085
1086         LASSERT(lock->l_exp_refs_nr >= 0);
1087
1088         if (lock->l_exp_refs_target != NULL &&
1089             lock->l_exp_refs_target != exp) {
1090                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1091                               exp, lock, lock->l_exp_refs_target);
1092         }
1093         if ((lock->l_exp_refs_nr ++) == 0) {
1094                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1095                 lock->l_exp_refs_target = exp;
1096         }
1097         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1098                lock, exp, lock->l_exp_refs_nr);
1099         cfs_spin_unlock(&exp->exp_locks_list_guard);
1100 }
1101 EXPORT_SYMBOL(__class_export_add_lock_ref);
1102
1103 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1104 {
1105         cfs_spin_lock(&exp->exp_locks_list_guard);
1106         LASSERT(lock->l_exp_refs_nr > 0);
1107         if (lock->l_exp_refs_target != exp) {
1108                 LCONSOLE_WARN("lock %p, "
1109                               "mismatching export pointers: %p, %p\n",
1110                               lock, lock->l_exp_refs_target, exp);
1111         }
1112         if (-- lock->l_exp_refs_nr == 0) {
1113                 cfs_list_del_init(&lock->l_exp_refs_link);
1114                 lock->l_exp_refs_target = NULL;
1115         }
1116         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1117                lock, exp, lock->l_exp_refs_nr);
1118         cfs_spin_unlock(&exp->exp_locks_list_guard);
1119 }
1120 EXPORT_SYMBOL(__class_export_del_lock_ref);
1121 #endif
1122
1123 /* A connection defines an export context in which preallocation can
1124    be managed. This releases the export pointer reference, and returns
1125    the export handle, so the export refcount is 1 when this function
1126    returns. */
1127 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1128                   struct obd_uuid *cluuid)
1129 {
1130         struct obd_export *export;
1131         LASSERT(conn != NULL);
1132         LASSERT(obd != NULL);
1133         LASSERT(cluuid != NULL);
1134         ENTRY;
1135
1136         export = class_new_export(obd, cluuid);
1137         if (IS_ERR(export))
1138                 RETURN(PTR_ERR(export));
1139
1140         conn->cookie = export->exp_handle.h_cookie;
1141         class_export_put(export);
1142
1143         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1144                cluuid->uuid, conn->cookie);
1145         RETURN(0);
1146 }
1147 EXPORT_SYMBOL(class_connect);
1148
1149
1150 /* This function removes 1-3 references from the export:
1151  * 1 - for export pointer passed
1152  * and if disconnect really need
1153  * 2 - removing from hash
1154  * 3 - in client_unlink_export
1155  * The export pointer passed to this function can destroyed */
1156 int class_disconnect(struct obd_export *export)
1157 {
1158         int already_disconnected;
1159         ENTRY;
1160
1161         if (export == NULL) {
1162                 CWARN("attempting to free NULL export %p\n", export);
1163                 RETURN(-EINVAL);
1164         }
1165
1166         cfs_spin_lock(&export->exp_lock);
1167         already_disconnected = export->exp_disconnected;
1168         export->exp_disconnected = 1;
1169         cfs_spin_unlock(&export->exp_lock);
1170
1171         /* class_cleanup(), abort_recovery(), and class_fail_export()
1172          * all end up in here, and if any of them race we shouldn't
1173          * call extra class_export_puts(). */
1174         if (already_disconnected) {
1175                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1176                 GOTO(no_disconn, already_disconnected);
1177         }
1178
1179         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1180                export->exp_handle.h_cookie);
1181
1182         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1183                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1184                              &export->exp_connection->c_peer.nid,
1185                              &export->exp_nid_hash);
1186
1187         class_unlink_export(export);
1188 no_disconn:
1189         class_export_put(export);
1190         RETURN(0);
1191 }
1192 EXPORT_SYMBOL(class_disconnect);
1193
1194 /* Return non-zero for a fully connected export */
1195 int class_connected_export(struct obd_export *exp)
1196 {
1197         if (exp) {
1198                 int connected;
1199                 cfs_spin_lock(&exp->exp_lock);
1200                 connected = (exp->exp_conn_cnt > 0);
1201                 cfs_spin_unlock(&exp->exp_lock);
1202                 return connected;
1203         }
1204         return 0;
1205 }
1206 EXPORT_SYMBOL(class_connected_export);
1207
1208 static void class_disconnect_export_list(cfs_list_t *list,
1209                                          enum obd_option flags)
1210 {
1211         int rc;
1212         struct obd_export *exp;
1213         ENTRY;
1214
1215         /* It's possible that an export may disconnect itself, but
1216          * nothing else will be added to this list. */
1217         while (!cfs_list_empty(list)) {
1218                 exp = cfs_list_entry(list->next, struct obd_export,
1219                                      exp_obd_chain);
1220                 /* need for safe call CDEBUG after obd_disconnect */
1221                 class_export_get(exp);
1222
1223                 cfs_spin_lock(&exp->exp_lock);
1224                 exp->exp_flags = flags;
1225                 cfs_spin_unlock(&exp->exp_lock);
1226
1227                 if (obd_uuid_equals(&exp->exp_client_uuid,
1228                                     &exp->exp_obd->obd_uuid)) {
1229                         CDEBUG(D_HA,
1230                                "exp %p export uuid == obd uuid, don't discon\n",
1231                                exp);
1232                         /* Need to delete this now so we don't end up pointing
1233                          * to work_list later when this export is cleaned up. */
1234                         cfs_list_del_init(&exp->exp_obd_chain);
1235                         class_export_put(exp);
1236                         continue;
1237                 }
1238
1239                 class_export_get(exp);
1240                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1241                        "last request at "CFS_TIME_T"\n",
1242                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1243                        exp, exp->exp_last_request_time);
1244                 /* release one export reference anyway */
1245                 rc = obd_disconnect(exp);
1246
1247                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1248                        obd_export_nid2str(exp), exp, rc);
1249                 class_export_put(exp);
1250         }
1251         EXIT;
1252 }
1253
1254 void class_disconnect_exports(struct obd_device *obd)
1255 {
1256         cfs_list_t work_list;
1257         ENTRY;
1258
1259         /* Move all of the exports from obd_exports to a work list, en masse. */
1260         CFS_INIT_LIST_HEAD(&work_list);
1261         cfs_spin_lock(&obd->obd_dev_lock);
1262         cfs_list_splice_init(&obd->obd_exports, &work_list);
1263         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1264         cfs_spin_unlock(&obd->obd_dev_lock);
1265
1266         if (!cfs_list_empty(&work_list)) {
1267                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1268                        "disconnecting them\n", obd->obd_minor, obd);
1269                 class_disconnect_export_list(&work_list,
1270                                              exp_flags_from_obd(obd));
1271         } else
1272                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1273                        obd->obd_minor, obd);
1274         EXIT;
1275 }
1276 EXPORT_SYMBOL(class_disconnect_exports);
1277
1278 /* Remove exports that have not completed recovery.
1279  */
1280 void class_disconnect_stale_exports(struct obd_device *obd,
1281                                     int (*test_export)(struct obd_export *))
1282 {
1283         cfs_list_t work_list;
1284         cfs_list_t *pos, *n;
1285         struct obd_export *exp;
1286         int evicted = 0;
1287         ENTRY;
1288
1289         CFS_INIT_LIST_HEAD(&work_list);
1290         cfs_spin_lock(&obd->obd_dev_lock);
1291         cfs_list_for_each_safe(pos, n, &obd->obd_exports) {
1292                 int failed;
1293
1294                 exp = cfs_list_entry(pos, struct obd_export, exp_obd_chain);
1295
1296                 /* don't count self-export as client */
1297                 if (obd_uuid_equals(&exp->exp_client_uuid,
1298                                     &exp->exp_obd->obd_uuid))
1299                         continue;
1300
1301                 if (test_export(exp))
1302                         continue;
1303
1304                 cfs_spin_lock(&exp->exp_lock);
1305                 failed = exp->exp_failed;
1306                 exp->exp_failed = 1;
1307                 cfs_spin_unlock(&exp->exp_lock);
1308                 if (failed)
1309                         continue;
1310
1311                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1312                 evicted++;
1313                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1314                        obd->obd_name, exp->exp_client_uuid.uuid,
1315                        exp->exp_connection == NULL ? "<unknown>" :
1316                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1317                 print_export_data(exp, "EVICTING", 0);
1318         }
1319         cfs_spin_unlock(&obd->obd_dev_lock);
1320
1321         if (evicted) {
1322                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1323                               obd->obd_name, evicted);
1324                 obd->obd_stale_clients += evicted;
1325         }
1326         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1327                                                  OBD_OPT_ABORT_RECOV);
1328         EXIT;
1329 }
1330 EXPORT_SYMBOL(class_disconnect_stale_exports);
1331
1332 void class_fail_export(struct obd_export *exp)
1333 {
1334         int rc, already_failed;
1335
1336         cfs_spin_lock(&exp->exp_lock);
1337         already_failed = exp->exp_failed;
1338         exp->exp_failed = 1;
1339         cfs_spin_unlock(&exp->exp_lock);
1340
1341         if (already_failed) {
1342                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1343                        exp, exp->exp_client_uuid.uuid);
1344                 return;
1345         }
1346
1347         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1348                exp, exp->exp_client_uuid.uuid);
1349
1350         if (obd_dump_on_timeout)
1351                 libcfs_debug_dumplog();
1352
1353         /* need for safe call CDEBUG after obd_disconnect */
1354         class_export_get(exp);
1355
1356         /* Most callers into obd_disconnect are removing their own reference
1357          * (request, for example) in addition to the one from the hash table.
1358          * We don't have such a reference here, so make one. */
1359         class_export_get(exp);
1360         rc = obd_disconnect(exp);
1361         if (rc)
1362                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1363         else
1364                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1365                        exp, exp->exp_client_uuid.uuid);
1366         class_export_put(exp);
1367 }
1368 EXPORT_SYMBOL(class_fail_export);
1369
1370 char *obd_export_nid2str(struct obd_export *exp)
1371 {
1372         if (exp->exp_connection != NULL)
1373                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1374
1375         return "(no nid)";
1376 }
1377 EXPORT_SYMBOL(obd_export_nid2str);
1378
1379 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1380 {
1381         struct obd_export *doomed_exp = NULL;
1382         int exports_evicted = 0;
1383
1384         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1385
1386         do {
1387                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1388                 if (doomed_exp == NULL)
1389                         break;
1390
1391                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1392                          "nid %s found, wanted nid %s, requested nid %s\n",
1393                          obd_export_nid2str(doomed_exp),
1394                          libcfs_nid2str(nid_key), nid);
1395                 LASSERTF(doomed_exp != obd->obd_self_export,
1396                          "self-export is hashed by NID?\n");
1397                 exports_evicted++;
1398                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1399                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1400                        exports_evicted);
1401                 class_fail_export(doomed_exp);
1402                 class_export_put(doomed_exp);
1403         } while (1);
1404
1405         if (!exports_evicted)
1406                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1407                        obd->obd_name, nid);
1408         return exports_evicted;
1409 }
1410 EXPORT_SYMBOL(obd_export_evict_by_nid);
1411
1412 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1413 {
1414         struct obd_export *doomed_exp = NULL;
1415         struct obd_uuid doomed_uuid;
1416         int exports_evicted = 0;
1417
1418         obd_str2uuid(&doomed_uuid, uuid);
1419         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1420                 CERROR("%s: can't evict myself\n", obd->obd_name);
1421                 return exports_evicted;
1422         }
1423
1424         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1425
1426         if (doomed_exp == NULL) {
1427                 CERROR("%s: can't disconnect %s: no exports found\n",
1428                        obd->obd_name, uuid);
1429         } else {
1430                 CWARN("%s: evicting %s at adminstrative request\n",
1431                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1432                 class_fail_export(doomed_exp);
1433                 class_export_put(doomed_exp);
1434                 exports_evicted++;
1435         }
1436
1437         return exports_evicted;
1438 }
1439 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1440
1441 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1442 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1443 EXPORT_SYMBOL(class_export_dump_hook);
1444 #endif
1445
1446 static void print_export_data(struct obd_export *exp, const char *status,
1447                               int locks)
1448 {
1449         struct ptlrpc_reply_state *rs;
1450         struct ptlrpc_reply_state *first_reply = NULL;
1451         int nreplies = 0;
1452
1453         cfs_spin_lock(&exp->exp_lock);
1454         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1455                                 rs_exp_list) {
1456                 if (nreplies == 0)
1457                         first_reply = rs;
1458                 nreplies++;
1459         }
1460         cfs_spin_unlock(&exp->exp_lock);
1461
1462         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1463                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1464                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1465                cfs_atomic_read(&exp->exp_rpc_count),
1466                cfs_atomic_read(&exp->exp_cb_count),
1467                cfs_atomic_read(&exp->exp_locks_count),
1468                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1469                nreplies, first_reply, nreplies > 3 ? "..." : "",
1470                exp->exp_last_committed);
1471 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1472         if (locks && class_export_dump_hook != NULL)
1473                 class_export_dump_hook(exp);
1474 #endif
1475 }
1476
1477 void dump_exports(struct obd_device *obd, int locks)
1478 {
1479         struct obd_export *exp;
1480
1481         cfs_spin_lock(&obd->obd_dev_lock);
1482         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1483                 print_export_data(exp, "ACTIVE", locks);
1484         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1485                 print_export_data(exp, "UNLINKED", locks);
1486         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1487                 print_export_data(exp, "DELAYED", locks);
1488         cfs_spin_unlock(&obd->obd_dev_lock);
1489         cfs_spin_lock(&obd_zombie_impexp_lock);
1490         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1491                 print_export_data(exp, "ZOMBIE", locks);
1492         cfs_spin_unlock(&obd_zombie_impexp_lock);
1493 }
1494 EXPORT_SYMBOL(dump_exports);
1495
1496 void obd_exports_barrier(struct obd_device *obd)
1497 {
1498         int waited = 2;
1499         LASSERT(cfs_list_empty(&obd->obd_exports));
1500         cfs_spin_lock(&obd->obd_dev_lock);
1501         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1502                 cfs_spin_unlock(&obd->obd_dev_lock);
1503                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1504                                                    cfs_time_seconds(waited));
1505                 if (waited > 5 && IS_PO2(waited)) {
1506                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1507                                       "more than %d seconds. "
1508                                       "The obd refcount = %d. Is it stuck?\n",
1509                                       obd->obd_name, waited,
1510                                       cfs_atomic_read(&obd->obd_refcount));
1511                         dump_exports(obd, 1);
1512                 }
1513                 waited *= 2;
1514                 cfs_spin_lock(&obd->obd_dev_lock);
1515         }
1516         cfs_spin_unlock(&obd->obd_dev_lock);
1517 }
1518 EXPORT_SYMBOL(obd_exports_barrier);
1519
1520 /* Total amount of zombies to be destroyed */
1521 static int zombies_count = 0;
1522
1523 /**
1524  * kill zombie imports and exports
1525  */
1526 void obd_zombie_impexp_cull(void)
1527 {
1528         struct obd_import *import;
1529         struct obd_export *export;
1530         ENTRY;
1531
1532         do {
1533                 cfs_spin_lock(&obd_zombie_impexp_lock);
1534
1535                 import = NULL;
1536                 if (!cfs_list_empty(&obd_zombie_imports)) {
1537                         import = cfs_list_entry(obd_zombie_imports.next,
1538                                                 struct obd_import,
1539                                                 imp_zombie_chain);
1540                         cfs_list_del_init(&import->imp_zombie_chain);
1541                 }
1542
1543                 export = NULL;
1544                 if (!cfs_list_empty(&obd_zombie_exports)) {
1545                         export = cfs_list_entry(obd_zombie_exports.next,
1546                                                 struct obd_export,
1547                                                 exp_obd_chain);
1548                         cfs_list_del_init(&export->exp_obd_chain);
1549                 }
1550
1551                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1552
1553                 if (import != NULL) {
1554                         class_import_destroy(import);
1555                         cfs_spin_lock(&obd_zombie_impexp_lock);
1556                         zombies_count--;
1557                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1558                 }
1559
1560                 if (export != NULL) {
1561                         class_export_destroy(export);
1562                         cfs_spin_lock(&obd_zombie_impexp_lock);
1563                         zombies_count--;
1564                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1565                 }
1566
1567                 cfs_cond_resched();
1568         } while (import != NULL || export != NULL);
1569         EXIT;
1570 }
1571
1572 static cfs_completion_t         obd_zombie_start;
1573 static cfs_completion_t         obd_zombie_stop;
1574 static unsigned long            obd_zombie_flags;
1575 static cfs_waitq_t              obd_zombie_waitq;
1576 static pid_t                    obd_zombie_pid;
1577
1578 enum {
1579         OBD_ZOMBIE_STOP   = 1 << 1
1580 };
1581
1582 /**
1583  * check for work for kill zombie import/export thread.
1584  */
1585 static int obd_zombie_impexp_check(void *arg)
1586 {
1587         int rc;
1588
1589         cfs_spin_lock(&obd_zombie_impexp_lock);
1590         rc = (zombies_count == 0) &&
1591              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1592         cfs_spin_unlock(&obd_zombie_impexp_lock);
1593
1594         RETURN(rc);
1595 }
1596
1597 /**
1598  * Add export to the obd_zombe thread and notify it.
1599  */
1600 static void obd_zombie_export_add(struct obd_export *exp) {
1601         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1602         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1603         cfs_list_del_init(&exp->exp_obd_chain);
1604         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1605         cfs_spin_lock(&obd_zombie_impexp_lock);
1606         zombies_count++;
1607         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1608         cfs_spin_unlock(&obd_zombie_impexp_lock);
1609
1610         obd_zombie_impexp_notify();
1611 }
1612
1613 /**
1614  * Add import to the obd_zombe thread and notify it.
1615  */
1616 static void obd_zombie_import_add(struct obd_import *imp) {
1617         LASSERT(imp->imp_sec == NULL);
1618         LASSERT(imp->imp_rq_pool == NULL);
1619         cfs_spin_lock(&obd_zombie_impexp_lock);
1620         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1621         zombies_count++;
1622         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1623         cfs_spin_unlock(&obd_zombie_impexp_lock);
1624
1625         obd_zombie_impexp_notify();
1626 }
1627
1628 /**
1629  * notify import/export destroy thread about new zombie.
1630  */
1631 static void obd_zombie_impexp_notify(void)
1632 {
1633         /*
1634          * Make sure obd_zomebie_impexp_thread get this notification.
1635          * It is possible this signal only get by obd_zombie_barrier, and
1636          * barrier gulps this notification and sleeps away and hangs ensues
1637          */
1638         cfs_waitq_broadcast(&obd_zombie_waitq);
1639 }
1640
1641 /**
1642  * check whether obd_zombie is idle
1643  */
1644 static int obd_zombie_is_idle(void)
1645 {
1646         int rc;
1647
1648         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1649         cfs_spin_lock(&obd_zombie_impexp_lock);
1650         rc = (zombies_count == 0);
1651         cfs_spin_unlock(&obd_zombie_impexp_lock);
1652         return rc;
1653 }
1654
1655 /**
1656  * wait when obd_zombie import/export queues become empty
1657  */
1658 void obd_zombie_barrier(void)
1659 {
1660         struct l_wait_info lwi = { 0 };
1661
1662         if (obd_zombie_pid == cfs_curproc_pid())
1663                 /* don't wait for myself */
1664                 return;
1665         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1666 }
1667 EXPORT_SYMBOL(obd_zombie_barrier);
1668
1669 #ifdef __KERNEL__
1670
1671 /**
1672  * destroy zombie export/import thread.
1673  */
1674 static int obd_zombie_impexp_thread(void *unused)
1675 {
1676         int rc;
1677
1678         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1679                 cfs_complete(&obd_zombie_start);
1680                 RETURN(rc);
1681         }
1682
1683         cfs_complete(&obd_zombie_start);
1684
1685         obd_zombie_pid = cfs_curproc_pid();
1686
1687         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1688                 struct l_wait_info lwi = { 0 };
1689
1690                 l_wait_event(obd_zombie_waitq,
1691                              !obd_zombie_impexp_check(NULL), &lwi);
1692                 obd_zombie_impexp_cull();
1693
1694                 /*
1695                  * Notify obd_zombie_barrier callers that queues
1696                  * may be empty.
1697                  */
1698                 cfs_waitq_signal(&obd_zombie_waitq);
1699         }
1700
1701         cfs_complete(&obd_zombie_stop);
1702
1703         RETURN(0);
1704 }
1705
1706 #else /* ! KERNEL */
1707
1708 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1709 static void *obd_zombie_impexp_work_cb;
1710 static void *obd_zombie_impexp_idle_cb;
1711
1712 int obd_zombie_impexp_kill(void *arg)
1713 {
1714         int rc = 0;
1715
1716         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1717                 obd_zombie_impexp_cull();
1718                 rc = 1;
1719         }
1720         cfs_atomic_dec(&zombie_recur);
1721         return rc;
1722 }
1723
1724 #endif
1725
1726 /**
1727  * start destroy zombie import/export thread
1728  */
1729 int obd_zombie_impexp_init(void)
1730 {
1731         int rc;
1732
1733         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1734         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1735         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1736         cfs_init_completion(&obd_zombie_start);
1737         cfs_init_completion(&obd_zombie_stop);
1738         cfs_waitq_init(&obd_zombie_waitq);
1739         obd_zombie_pid = 0;
1740
1741 #ifdef __KERNEL__
1742         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1743         if (rc < 0)
1744                 RETURN(rc);
1745
1746         cfs_wait_for_completion(&obd_zombie_start);
1747 #else
1748
1749         obd_zombie_impexp_work_cb =
1750                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1751                                                  &obd_zombie_impexp_kill, NULL);
1752
1753         obd_zombie_impexp_idle_cb =
1754                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1755                                                  &obd_zombie_impexp_check, NULL);
1756         rc = 0;
1757 #endif
1758         RETURN(rc);
1759 }
1760 /**
1761  * stop destroy zombie import/export thread
1762  */
1763 void obd_zombie_impexp_stop(void)
1764 {
1765         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1766         obd_zombie_impexp_notify();
1767 #ifdef __KERNEL__
1768         cfs_wait_for_completion(&obd_zombie_stop);
1769 #else
1770         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1771         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1772 #endif
1773 }
1774
1775 /***** Kernel-userspace comm helpers *******/
1776
1777 /* Get length of entire message, including header */
1778 int kuc_len(int payload_len)
1779 {
1780         return sizeof(struct kuc_hdr) + payload_len;
1781 }
1782 EXPORT_SYMBOL(kuc_len);
1783
1784 /* Get a pointer to kuc header, given a ptr to the payload
1785  * @param p Pointer to payload area
1786  * @returns Pointer to kuc header
1787  */
1788 struct kuc_hdr * kuc_ptr(void *p)
1789 {
1790         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1791         LASSERT(lh->kuc_magic == KUC_MAGIC);
1792         return lh;
1793 }
1794 EXPORT_SYMBOL(kuc_ptr);
1795
1796 /* Test if payload is part of kuc message
1797  * @param p Pointer to payload area
1798  * @returns boolean
1799  */
1800 int kuc_ispayload(void *p)
1801 {
1802         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1803
1804         if (kh->kuc_magic == KUC_MAGIC)
1805                 return 1;
1806         else
1807                 return 0;
1808 }
1809 EXPORT_SYMBOL(kuc_ispayload);
1810
1811 /* Alloc space for a message, and fill in header
1812  * @return Pointer to payload area
1813  */
1814 void *kuc_alloc(int payload_len, int transport, int type)
1815 {
1816         struct kuc_hdr *lh;
1817         int len = kuc_len(payload_len);
1818
1819         OBD_ALLOC(lh, len);
1820         if (lh == NULL)
1821                 return ERR_PTR(-ENOMEM);
1822
1823         lh->kuc_magic = KUC_MAGIC;
1824         lh->kuc_transport = transport;
1825         lh->kuc_msgtype = type;
1826         lh->kuc_msglen = len;
1827
1828         return (void *)(lh + 1);
1829 }
1830 EXPORT_SYMBOL(kuc_alloc);
1831
1832 /* Takes pointer to payload area */
1833 inline void kuc_free(void *p, int payload_len)
1834 {
1835         struct kuc_hdr *lh = kuc_ptr(p);
1836         OBD_FREE(lh, kuc_len(payload_len));
1837 }
1838 EXPORT_SYMBOL(kuc_free);
1839
1840
1841