Whamcloud - gitweb
LU-1303 lod: add lprocfs support to lod
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43 #ifndef __KERNEL__
44 #include <liblustre.h>
45 #endif
46 #include <obd_ost.h>
47 #include <obd_class.h>
48 #include <lprocfs_status.h>
49
50 extern cfs_list_t obd_types;
51 cfs_spinlock_t obd_types_lock;
52
53 cfs_mem_cache_t *obd_device_cachep;
54 cfs_mem_cache_t *obdo_cachep;
55 EXPORT_SYMBOL(obdo_cachep);
56 cfs_mem_cache_t *import_cachep;
57
58 cfs_list_t      obd_zombie_imports;
59 cfs_list_t      obd_zombie_exports;
60 cfs_spinlock_t  obd_zombie_impexp_lock;
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
68 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
69
70 /*
71  * support functions: we could use inter-module communication, but this
72  * is more portable to other OS's
73  */
74 static struct obd_device *obd_device_alloc(void)
75 {
76         struct obd_device *obd;
77
78         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, CFS_ALLOC_IO);
79         if (obd != NULL) {
80                 obd->obd_magic = OBD_DEVICE_MAGIC;
81         }
82         return obd;
83 }
84
85 static void obd_device_free(struct obd_device *obd)
86 {
87         LASSERT(obd != NULL);
88         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
89                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
90         if (obd->obd_namespace != NULL) {
91                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
92                        obd, obd->obd_namespace, obd->obd_force);
93                 LBUG();
94         }
95         lu_ref_fini(&obd->obd_reference);
96         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
97 }
98
99 struct obd_type *class_search_type(const char *name)
100 {
101         cfs_list_t *tmp;
102         struct obd_type *type;
103
104         cfs_spin_lock(&obd_types_lock);
105         cfs_list_for_each(tmp, &obd_types) {
106                 type = cfs_list_entry(tmp, struct obd_type, typ_chain);
107                 if (strcmp(type->typ_name, name) == 0) {
108                         cfs_spin_unlock(&obd_types_lock);
109                         return type;
110                 }
111         }
112         cfs_spin_unlock(&obd_types_lock);
113         return NULL;
114 }
115 EXPORT_SYMBOL(class_search_type);
116
117 struct obd_type *class_get_type(const char *name)
118 {
119         struct obd_type *type = class_search_type(name);
120
121 #ifdef HAVE_MODULE_LOADING_SUPPORT
122         if (!type) {
123                 const char *modname = name;
124                 if (!cfs_request_module("%s", modname)) {
125                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
126                         type = class_search_type(name);
127                 } else {
128                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
129                                            modname);
130                 }
131         }
132 #endif
133         if (type) {
134                 cfs_spin_lock(&type->obd_type_lock);
135                 type->typ_refcnt++;
136                 cfs_try_module_get(type->typ_dt_ops->o_owner);
137                 cfs_spin_unlock(&type->obd_type_lock);
138         }
139         return type;
140 }
141 EXPORT_SYMBOL(class_get_type);
142
143 void class_put_type(struct obd_type *type)
144 {
145         LASSERT(type);
146         cfs_spin_lock(&type->obd_type_lock);
147         type->typ_refcnt--;
148         cfs_module_put(type->typ_dt_ops->o_owner);
149         cfs_spin_unlock(&type->obd_type_lock);
150 }
151 EXPORT_SYMBOL(class_put_type);
152
153 #define CLASS_MAX_NAME 1024
154
155 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
156                         struct lprocfs_vars *vars, const char *name,
157                         struct lu_device_type *ldt)
158 {
159         struct obd_type *type;
160         int rc = 0;
161         ENTRY;
162
163         /* sanity check */
164         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
165
166         if (class_search_type(name)) {
167                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
168                 RETURN(-EEXIST);
169         }
170
171         rc = -ENOMEM;
172         OBD_ALLOC(type, sizeof(*type));
173         if (type == NULL)
174                 RETURN(rc);
175
176         OBD_ALLOC_PTR(type->typ_dt_ops);
177         OBD_ALLOC_PTR(type->typ_md_ops);
178         OBD_ALLOC(type->typ_name, strlen(name) + 1);
179
180         if (type->typ_dt_ops == NULL ||
181             type->typ_md_ops == NULL ||
182             type->typ_name == NULL)
183                 GOTO (failed, rc);
184
185         *(type->typ_dt_ops) = *dt_ops;
186         /* md_ops is optional */
187         if (md_ops)
188                 *(type->typ_md_ops) = *md_ops;
189         strcpy(type->typ_name, name);
190         cfs_spin_lock_init(&type->obd_type_lock);
191
192 #ifdef LPROCFS
193         type->typ_procroot = lprocfs_register(type->typ_name, proc_lustre_root,
194                                               vars, type);
195         if (IS_ERR(type->typ_procroot)) {
196                 rc = PTR_ERR(type->typ_procroot);
197                 type->typ_procroot = NULL;
198                 GOTO (failed, rc);
199         }
200 #endif
201         if (ldt != NULL) {
202                 type->typ_lu = ldt;
203                 rc = lu_device_type_init(ldt);
204                 if (rc != 0)
205                         GOTO (failed, rc);
206         }
207
208         cfs_spin_lock(&obd_types_lock);
209         cfs_list_add(&type->typ_chain, &obd_types);
210         cfs_spin_unlock(&obd_types_lock);
211
212         RETURN (0);
213
214  failed:
215         if (type->typ_name != NULL)
216                 OBD_FREE(type->typ_name, strlen(name) + 1);
217         if (type->typ_md_ops != NULL)
218                 OBD_FREE_PTR(type->typ_md_ops);
219         if (type->typ_dt_ops != NULL)
220                 OBD_FREE_PTR(type->typ_dt_ops);
221         OBD_FREE(type, sizeof(*type));
222         RETURN(rc);
223 }
224 EXPORT_SYMBOL(class_register_type);
225
226 int class_unregister_type(const char *name)
227 {
228         struct obd_type *type = class_search_type(name);
229         ENTRY;
230
231         if (!type) {
232                 CERROR("unknown obd type\n");
233                 RETURN(-EINVAL);
234         }
235
236         if (type->typ_refcnt) {
237                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
238                 /* This is a bad situation, let's make the best of it */
239                 /* Remove ops, but leave the name for debugging */
240                 OBD_FREE_PTR(type->typ_dt_ops);
241                 OBD_FREE_PTR(type->typ_md_ops);
242                 RETURN(-EBUSY);
243         }
244
245         /* we do not use type->typ_procroot as for compatibility purposes
246          * other modules can share names (i.e. lod can use lov entry). so
247          * we can't reference pointer as it can get invalided when another
248          * module removes the entry */
249         lprocfs_try_remove_proc_entry(type->typ_name, proc_lustre_root);
250
251         if (type->typ_lu)
252                 lu_device_type_fini(type->typ_lu);
253
254         cfs_spin_lock(&obd_types_lock);
255         cfs_list_del(&type->typ_chain);
256         cfs_spin_unlock(&obd_types_lock);
257         OBD_FREE(type->typ_name, strlen(name) + 1);
258         if (type->typ_dt_ops != NULL)
259                 OBD_FREE_PTR(type->typ_dt_ops);
260         if (type->typ_md_ops != NULL)
261                 OBD_FREE_PTR(type->typ_md_ops);
262         OBD_FREE(type, sizeof(*type));
263         RETURN(0);
264 } /* class_unregister_type */
265 EXPORT_SYMBOL(class_unregister_type);
266
267 /**
268  * Create a new obd device.
269  *
270  * Find an empty slot in ::obd_devs[], create a new obd device in it.
271  *
272  * \param[in] type_name obd device type string.
273  * \param[in] name      obd device name.
274  *
275  * \retval NULL if create fails, otherwise return the obd device
276  *         pointer created.
277  */
278 struct obd_device *class_newdev(const char *type_name, const char *name)
279 {
280         struct obd_device *result = NULL;
281         struct obd_device *newdev;
282         struct obd_type *type = NULL;
283         int i;
284         int new_obd_minor = 0;
285         ENTRY;
286
287         if (strlen(name) >= MAX_OBD_NAME) {
288                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
289                 RETURN(ERR_PTR(-EINVAL));
290         }
291
292         type = class_get_type(type_name);
293         if (type == NULL){
294                 CERROR("OBD: unknown type: %s\n", type_name);
295                 RETURN(ERR_PTR(-ENODEV));
296         }
297
298         newdev = obd_device_alloc();
299         if (newdev == NULL) {
300                 class_put_type(type);
301                 RETURN(ERR_PTR(-ENOMEM));
302         }
303         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
304
305         cfs_write_lock(&obd_dev_lock);
306         for (i = 0; i < class_devno_max(); i++) {
307                 struct obd_device *obd = class_num2obd(i);
308
309                 if (obd && obd->obd_name &&
310                     (strcmp(name, obd->obd_name) == 0)) {
311                         CERROR("Device %s already exists at %d, won't add\n",
312                                name, i);
313                         if (result) {
314                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
315                                          "%p obd_magic %08x != %08x\n", result,
316                                          result->obd_magic, OBD_DEVICE_MAGIC);
317                                 LASSERTF(result->obd_minor == new_obd_minor,
318                                          "%p obd_minor %d != %d\n", result,
319                                          result->obd_minor, new_obd_minor);
320
321                                 obd_devs[result->obd_minor] = NULL;
322                                 result->obd_name[0]='\0';
323                          }
324                         result = ERR_PTR(-EEXIST);
325                         break;
326                 }
327                 if (!result && !obd) {
328                         result = newdev;
329                         result->obd_minor = i;
330                         new_obd_minor = i;
331                         result->obd_type = type;
332                         strncpy(result->obd_name, name,
333                                 sizeof(result->obd_name) - 1);
334                         obd_devs[i] = result;
335                 }
336         }
337         cfs_write_unlock(&obd_dev_lock);
338
339         if (result == NULL && i >= class_devno_max()) {
340                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
341                        class_devno_max());
342                 RETURN(ERR_PTR(-EOVERFLOW));
343         }
344
345         if (IS_ERR(result)) {
346                 obd_device_free(newdev);
347                 class_put_type(type);
348         } else {
349                 CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
350                        result->obd_name, result);
351         }
352         RETURN(result);
353 }
354
355 void class_release_dev(struct obd_device *obd)
356 {
357         struct obd_type *obd_type = obd->obd_type;
358
359         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
360                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
361         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
362                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
363         LASSERT(obd_type != NULL);
364
365         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
366                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
367
368         cfs_write_lock(&obd_dev_lock);
369         obd_devs[obd->obd_minor] = NULL;
370         cfs_write_unlock(&obd_dev_lock);
371         obd_device_free(obd);
372
373         class_put_type(obd_type);
374 }
375
376 int class_name2dev(const char *name)
377 {
378         int i;
379
380         if (!name)
381                 return -1;
382
383         cfs_read_lock(&obd_dev_lock);
384         for (i = 0; i < class_devno_max(); i++) {
385                 struct obd_device *obd = class_num2obd(i);
386
387                 if (obd && obd->obd_name && strcmp(name, obd->obd_name) == 0) {
388                         /* Make sure we finished attaching before we give
389                            out any references */
390                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
391                         if (obd->obd_attached) {
392                                 cfs_read_unlock(&obd_dev_lock);
393                                 return i;
394                         }
395                         break;
396                 }
397         }
398         cfs_read_unlock(&obd_dev_lock);
399
400         return -1;
401 }
402 EXPORT_SYMBOL(class_name2dev);
403
404 struct obd_device *class_name2obd(const char *name)
405 {
406         int dev = class_name2dev(name);
407
408         if (dev < 0 || dev > class_devno_max())
409                 return NULL;
410         return class_num2obd(dev);
411 }
412 EXPORT_SYMBOL(class_name2obd);
413
414 int class_uuid2dev(struct obd_uuid *uuid)
415 {
416         int i;
417
418         cfs_read_lock(&obd_dev_lock);
419         for (i = 0; i < class_devno_max(); i++) {
420                 struct obd_device *obd = class_num2obd(i);
421
422                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
423                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
424                         cfs_read_unlock(&obd_dev_lock);
425                         return i;
426                 }
427         }
428         cfs_read_unlock(&obd_dev_lock);
429
430         return -1;
431 }
432 EXPORT_SYMBOL(class_uuid2dev);
433
434 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
435 {
436         int dev = class_uuid2dev(uuid);
437         if (dev < 0)
438                 return NULL;
439         return class_num2obd(dev);
440 }
441 EXPORT_SYMBOL(class_uuid2obd);
442
443 /**
444  * Get obd device from ::obd_devs[]
445  *
446  * \param num [in] array index
447  *
448  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
449  *         otherwise return the obd device there.
450  */
451 struct obd_device *class_num2obd(int num)
452 {
453         struct obd_device *obd = NULL;
454
455         if (num < class_devno_max()) {
456                 obd = obd_devs[num];
457                 if (obd == NULL)
458                         return NULL;
459
460                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
461                          "%p obd_magic %08x != %08x\n",
462                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
463                 LASSERTF(obd->obd_minor == num,
464                          "%p obd_minor %0d != %0d\n",
465                          obd, obd->obd_minor, num);
466         }
467
468         return obd;
469 }
470 EXPORT_SYMBOL(class_num2obd);
471
472 void class_obd_list(void)
473 {
474         char *status;
475         int i;
476
477         cfs_read_lock(&obd_dev_lock);
478         for (i = 0; i < class_devno_max(); i++) {
479                 struct obd_device *obd = class_num2obd(i);
480
481                 if (obd == NULL)
482                         continue;
483                 if (obd->obd_stopping)
484                         status = "ST";
485                 else if (obd->obd_set_up)
486                         status = "UP";
487                 else if (obd->obd_attached)
488                         status = "AT";
489                 else
490                         status = "--";
491                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
492                          i, status, obd->obd_type->typ_name,
493                          obd->obd_name, obd->obd_uuid.uuid,
494                          cfs_atomic_read(&obd->obd_refcount));
495         }
496         cfs_read_unlock(&obd_dev_lock);
497         return;
498 }
499
500 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
501    specified, then only the client with that uuid is returned,
502    otherwise any client connected to the tgt is returned. */
503 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
504                                           const char * typ_name,
505                                           struct obd_uuid *grp_uuid)
506 {
507         int i;
508
509         cfs_read_lock(&obd_dev_lock);
510         for (i = 0; i < class_devno_max(); i++) {
511                 struct obd_device *obd = class_num2obd(i);
512
513                 if (obd == NULL)
514                         continue;
515                 if ((strncmp(obd->obd_type->typ_name, typ_name,
516                              strlen(typ_name)) == 0)) {
517                         if (obd_uuid_equals(tgt_uuid,
518                                             &obd->u.cli.cl_target_uuid) &&
519                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
520                                                          &obd->obd_uuid) : 1)) {
521                                 cfs_read_unlock(&obd_dev_lock);
522                                 return obd;
523                         }
524                 }
525         }
526         cfs_read_unlock(&obd_dev_lock);
527
528         return NULL;
529 }
530 EXPORT_SYMBOL(class_find_client_obd);
531
532 /* Iterate the obd_device list looking devices have grp_uuid. Start
533    searching at *next, and if a device is found, the next index to look
534    at is saved in *next. If next is NULL, then the first matching device
535    will always be returned. */
536 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
537 {
538         int i;
539
540         if (next == NULL)
541                 i = 0;
542         else if (*next >= 0 && *next < class_devno_max())
543                 i = *next;
544         else
545                 return NULL;
546
547         cfs_read_lock(&obd_dev_lock);
548         for (; i < class_devno_max(); i++) {
549                 struct obd_device *obd = class_num2obd(i);
550
551                 if (obd == NULL)
552                         continue;
553                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
554                         if (next != NULL)
555                                 *next = i+1;
556                         cfs_read_unlock(&obd_dev_lock);
557                         return obd;
558                 }
559         }
560         cfs_read_unlock(&obd_dev_lock);
561
562         return NULL;
563 }
564 EXPORT_SYMBOL(class_devices_in_group);
565
566 /**
567  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
568  * adjust sptlrpc settings accordingly.
569  */
570 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
571 {
572         struct obd_device  *obd;
573         const char         *type;
574         int                 i, rc = 0, rc2;
575
576         LASSERT(namelen > 0);
577
578         cfs_read_lock(&obd_dev_lock);
579         for (i = 0; i < class_devno_max(); i++) {
580                 obd = class_num2obd(i);
581
582                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
583                         continue;
584
585                 /* only notify mdc, osc, mdt, ost */
586                 type = obd->obd_type->typ_name;
587                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
588                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
589                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
590                     strcmp(type, LUSTRE_OST_NAME) != 0)
591                         continue;
592
593                 if (strncmp(obd->obd_name, fsname, namelen))
594                         continue;
595
596                 class_incref(obd, __FUNCTION__, obd);
597                 cfs_read_unlock(&obd_dev_lock);
598                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
599                                          sizeof(KEY_SPTLRPC_CONF),
600                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
601                 rc = rc ? rc : rc2;
602                 class_decref(obd, __FUNCTION__, obd);
603                 cfs_read_lock(&obd_dev_lock);
604         }
605         cfs_read_unlock(&obd_dev_lock);
606         return rc;
607 }
608 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
609
610 void obd_cleanup_caches(void)
611 {
612         int rc;
613
614         ENTRY;
615         if (obd_device_cachep) {
616                 rc = cfs_mem_cache_destroy(obd_device_cachep);
617                 LASSERTF(rc == 0, "Cannot destropy ll_obd_device_cache: rc %d\n", rc);
618                 obd_device_cachep = NULL;
619         }
620         if (obdo_cachep) {
621                 rc = cfs_mem_cache_destroy(obdo_cachep);
622                 LASSERTF(rc == 0, "Cannot destory ll_obdo_cache\n");
623                 obdo_cachep = NULL;
624         }
625         if (import_cachep) {
626                 rc = cfs_mem_cache_destroy(import_cachep);
627                 LASSERTF(rc == 0, "Cannot destory ll_import_cache\n");
628                 import_cachep = NULL;
629         }
630         if (capa_cachep) {
631                 rc = cfs_mem_cache_destroy(capa_cachep);
632                 LASSERTF(rc == 0, "Cannot destory capa_cache\n");
633                 capa_cachep = NULL;
634         }
635         EXIT;
636 }
637
638 int obd_init_caches(void)
639 {
640         ENTRY;
641
642         LASSERT(obd_device_cachep == NULL);
643         obd_device_cachep = cfs_mem_cache_create("ll_obd_dev_cache",
644                                                  sizeof(struct obd_device),
645                                                  0, 0);
646         if (!obd_device_cachep)
647                 GOTO(out, -ENOMEM);
648
649         LASSERT(obdo_cachep == NULL);
650         obdo_cachep = cfs_mem_cache_create("ll_obdo_cache", sizeof(struct obdo),
651                                            0, 0);
652         if (!obdo_cachep)
653                 GOTO(out, -ENOMEM);
654
655         LASSERT(import_cachep == NULL);
656         import_cachep = cfs_mem_cache_create("ll_import_cache",
657                                              sizeof(struct obd_import),
658                                              0, 0);
659         if (!import_cachep)
660                 GOTO(out, -ENOMEM);
661
662         LASSERT(capa_cachep == NULL);
663         capa_cachep = cfs_mem_cache_create("capa_cache",
664                                            sizeof(struct obd_capa), 0, 0);
665         if (!capa_cachep)
666                 GOTO(out, -ENOMEM);
667
668         RETURN(0);
669  out:
670         obd_cleanup_caches();
671         RETURN(-ENOMEM);
672
673 }
674
675 /* map connection to client */
676 struct obd_export *class_conn2export(struct lustre_handle *conn)
677 {
678         struct obd_export *export;
679         ENTRY;
680
681         if (!conn) {
682                 CDEBUG(D_CACHE, "looking for null handle\n");
683                 RETURN(NULL);
684         }
685
686         if (conn->cookie == -1) {  /* this means assign a new connection */
687                 CDEBUG(D_CACHE, "want a new connection\n");
688                 RETURN(NULL);
689         }
690
691         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
692         export = class_handle2object(conn->cookie);
693         RETURN(export);
694 }
695 EXPORT_SYMBOL(class_conn2export);
696
697 struct obd_device *class_exp2obd(struct obd_export *exp)
698 {
699         if (exp)
700                 return exp->exp_obd;
701         return NULL;
702 }
703 EXPORT_SYMBOL(class_exp2obd);
704
705 struct obd_device *class_conn2obd(struct lustre_handle *conn)
706 {
707         struct obd_export *export;
708         export = class_conn2export(conn);
709         if (export) {
710                 struct obd_device *obd = export->exp_obd;
711                 class_export_put(export);
712                 return obd;
713         }
714         return NULL;
715 }
716 EXPORT_SYMBOL(class_conn2obd);
717
718 struct obd_import *class_exp2cliimp(struct obd_export *exp)
719 {
720         struct obd_device *obd = exp->exp_obd;
721         if (obd == NULL)
722                 return NULL;
723         return obd->u.cli.cl_import;
724 }
725 EXPORT_SYMBOL(class_exp2cliimp);
726
727 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
728 {
729         struct obd_device *obd = class_conn2obd(conn);
730         if (obd == NULL)
731                 return NULL;
732         return obd->u.cli.cl_import;
733 }
734 EXPORT_SYMBOL(class_conn2cliimp);
735
736 /* Export management functions */
737 static void class_export_destroy(struct obd_export *exp)
738 {
739         struct obd_device *obd = exp->exp_obd;
740         ENTRY;
741
742         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
743         LASSERT(obd != NULL);
744
745         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
746                exp->exp_client_uuid.uuid, obd->obd_name);
747
748         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
749         if (exp->exp_connection)
750                 ptlrpc_put_connection_superhack(exp->exp_connection);
751
752         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
753         LASSERT(cfs_list_empty(&exp->exp_uncommitted_replies));
754         LASSERT(cfs_list_empty(&exp->exp_req_replay_queue));
755         LASSERT(cfs_list_empty(&exp->exp_hp_rpcs));
756         obd_destroy_export(exp);
757         class_decref(obd, "export", exp);
758
759         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
760         EXIT;
761 }
762
763 static void export_handle_addref(void *export)
764 {
765         class_export_get(export);
766 }
767
768 static struct portals_handle_ops export_handle_ops = {
769         .hop_addref = export_handle_addref,
770         .hop_free   = NULL,
771 };
772
773 struct obd_export *class_export_get(struct obd_export *exp)
774 {
775         cfs_atomic_inc(&exp->exp_refcount);
776         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
777                cfs_atomic_read(&exp->exp_refcount));
778         return exp;
779 }
780 EXPORT_SYMBOL(class_export_get);
781
782 void class_export_put(struct obd_export *exp)
783 {
784         LASSERT(exp != NULL);
785         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
786         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
787                cfs_atomic_read(&exp->exp_refcount) - 1);
788
789         if (cfs_atomic_dec_and_test(&exp->exp_refcount)) {
790                 LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
791                 CDEBUG(D_IOCTL, "final put %p/%s\n",
792                        exp, exp->exp_client_uuid.uuid);
793
794                 /* release nid stat refererence */
795                 lprocfs_exp_cleanup(exp);
796
797                 obd_zombie_export_add(exp);
798         }
799 }
800 EXPORT_SYMBOL(class_export_put);
801
802 /* Creates a new export, adds it to the hash table, and returns a
803  * pointer to it. The refcount is 2: one for the hash reference, and
804  * one for the pointer returned by this function. */
805 struct obd_export *class_new_export(struct obd_device *obd,
806                                     struct obd_uuid *cluuid)
807 {
808         struct obd_export *export;
809         cfs_hash_t *hash = NULL;
810         int rc = 0;
811         ENTRY;
812
813         OBD_ALLOC_PTR(export);
814         if (!export)
815                 return ERR_PTR(-ENOMEM);
816
817         export->exp_conn_cnt = 0;
818         export->exp_lock_hash = NULL;
819         export->exp_flock_hash = NULL;
820         cfs_atomic_set(&export->exp_refcount, 2);
821         cfs_atomic_set(&export->exp_rpc_count, 0);
822         cfs_atomic_set(&export->exp_cb_count, 0);
823         cfs_atomic_set(&export->exp_locks_count, 0);
824 #if LUSTRE_TRACKS_LOCK_EXP_REFS
825         CFS_INIT_LIST_HEAD(&export->exp_locks_list);
826         cfs_spin_lock_init(&export->exp_locks_list_guard);
827 #endif
828         cfs_atomic_set(&export->exp_replay_count, 0);
829         export->exp_obd = obd;
830         CFS_INIT_LIST_HEAD(&export->exp_outstanding_replies);
831         cfs_spin_lock_init(&export->exp_uncommitted_replies_lock);
832         CFS_INIT_LIST_HEAD(&export->exp_uncommitted_replies);
833         CFS_INIT_LIST_HEAD(&export->exp_req_replay_queue);
834         CFS_INIT_LIST_HEAD(&export->exp_handle.h_link);
835         CFS_INIT_LIST_HEAD(&export->exp_hp_rpcs);
836         class_handle_hash(&export->exp_handle, &export_handle_ops);
837         export->exp_last_request_time = cfs_time_current_sec();
838         cfs_spin_lock_init(&export->exp_lock);
839         cfs_spin_lock_init(&export->exp_rpc_lock);
840         CFS_INIT_HLIST_NODE(&export->exp_uuid_hash);
841         CFS_INIT_HLIST_NODE(&export->exp_nid_hash);
842         cfs_spin_lock_init(&export->exp_bl_list_lock);
843         CFS_INIT_LIST_HEAD(&export->exp_bl_list);
844
845         export->exp_sp_peer = LUSTRE_SP_ANY;
846         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
847         export->exp_client_uuid = *cluuid;
848         obd_init_export(export);
849
850         cfs_spin_lock(&obd->obd_dev_lock);
851          /* shouldn't happen, but might race */
852         if (obd->obd_stopping)
853                 GOTO(exit_unlock, rc = -ENODEV);
854
855         hash = cfs_hash_getref(obd->obd_uuid_hash);
856         if (hash == NULL)
857                 GOTO(exit_unlock, rc = -ENODEV);
858         cfs_spin_unlock(&obd->obd_dev_lock);
859
860         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
861                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
862                 if (rc != 0) {
863                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
864                                       obd->obd_name, cluuid->uuid, rc);
865                         GOTO(exit_err, rc = -EALREADY);
866                 }
867         }
868
869         cfs_spin_lock(&obd->obd_dev_lock);
870         if (obd->obd_stopping) {
871                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
872                 GOTO(exit_unlock, rc = -ENODEV);
873         }
874
875         class_incref(obd, "export", export);
876         cfs_list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
877         cfs_list_add_tail(&export->exp_obd_chain_timed,
878                           &export->exp_obd->obd_exports_timed);
879         export->exp_obd->obd_num_exports++;
880         cfs_spin_unlock(&obd->obd_dev_lock);
881         cfs_hash_putref(hash);
882         RETURN(export);
883
884 exit_unlock:
885         cfs_spin_unlock(&obd->obd_dev_lock);
886 exit_err:
887         if (hash)
888                 cfs_hash_putref(hash);
889         class_handle_unhash(&export->exp_handle);
890         LASSERT(cfs_hlist_unhashed(&export->exp_uuid_hash));
891         obd_destroy_export(export);
892         OBD_FREE_PTR(export);
893         return ERR_PTR(rc);
894 }
895 EXPORT_SYMBOL(class_new_export);
896
897 void class_unlink_export(struct obd_export *exp)
898 {
899         class_handle_unhash(&exp->exp_handle);
900
901         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
902         /* delete an uuid-export hashitem from hashtables */
903         if (!cfs_hlist_unhashed(&exp->exp_uuid_hash))
904                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
905                              &exp->exp_client_uuid,
906                              &exp->exp_uuid_hash);
907
908         cfs_list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
909         cfs_list_del_init(&exp->exp_obd_chain_timed);
910         exp->exp_obd->obd_num_exports--;
911         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
912         class_export_put(exp);
913 }
914 EXPORT_SYMBOL(class_unlink_export);
915
916 /* Import management functions */
917 void class_import_destroy(struct obd_import *imp)
918 {
919         ENTRY;
920
921         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
922                 imp->imp_obd->obd_name);
923
924         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
925
926         ptlrpc_put_connection_superhack(imp->imp_connection);
927
928         while (!cfs_list_empty(&imp->imp_conn_list)) {
929                 struct obd_import_conn *imp_conn;
930
931                 imp_conn = cfs_list_entry(imp->imp_conn_list.next,
932                                           struct obd_import_conn, oic_item);
933                 cfs_list_del_init(&imp_conn->oic_item);
934                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
935                 OBD_FREE(imp_conn, sizeof(*imp_conn));
936         }
937
938         LASSERT(imp->imp_sec == NULL);
939         class_decref(imp->imp_obd, "import", imp);
940         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
941         EXIT;
942 }
943
944 static void import_handle_addref(void *import)
945 {
946         class_import_get(import);
947 }
948
949 static struct portals_handle_ops import_handle_ops = {
950         .hop_addref = import_handle_addref,
951         .hop_free   = NULL,
952 };
953
954 struct obd_import *class_import_get(struct obd_import *import)
955 {
956         cfs_atomic_inc(&import->imp_refcount);
957         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
958                cfs_atomic_read(&import->imp_refcount),
959                import->imp_obd->obd_name);
960         return import;
961 }
962 EXPORT_SYMBOL(class_import_get);
963
964 void class_import_put(struct obd_import *imp)
965 {
966         ENTRY;
967
968         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
969         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
970
971         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
972                cfs_atomic_read(&imp->imp_refcount) - 1,
973                imp->imp_obd->obd_name);
974
975         if (cfs_atomic_dec_and_test(&imp->imp_refcount)) {
976                 CDEBUG(D_INFO, "final put import %p\n", imp);
977                 obd_zombie_import_add(imp);
978         }
979
980         /* catch possible import put race */
981         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
982         EXIT;
983 }
984 EXPORT_SYMBOL(class_import_put);
985
986 static void init_imp_at(struct imp_at *at) {
987         int i;
988         at_init(&at->iat_net_latency, 0, 0);
989         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
990                 /* max service estimates are tracked on the server side, so
991                    don't use the AT history here, just use the last reported
992                    val. (But keep hist for proc histogram, worst_ever) */
993                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
994                         AT_FLG_NOHIST);
995         }
996 }
997
998 struct obd_import *class_new_import(struct obd_device *obd)
999 {
1000         struct obd_import *imp;
1001
1002         OBD_ALLOC(imp, sizeof(*imp));
1003         if (imp == NULL)
1004                 return NULL;
1005
1006         CFS_INIT_LIST_HEAD(&imp->imp_zombie_chain);
1007         CFS_INIT_LIST_HEAD(&imp->imp_replay_list);
1008         CFS_INIT_LIST_HEAD(&imp->imp_sending_list);
1009         CFS_INIT_LIST_HEAD(&imp->imp_delayed_list);
1010         cfs_spin_lock_init(&imp->imp_lock);
1011         imp->imp_last_success_conn = 0;
1012         imp->imp_state = LUSTRE_IMP_NEW;
1013         imp->imp_obd = class_incref(obd, "import", imp);
1014         cfs_mutex_init(&imp->imp_sec_mutex);
1015         cfs_waitq_init(&imp->imp_recovery_waitq);
1016
1017         cfs_atomic_set(&imp->imp_refcount, 2);
1018         cfs_atomic_set(&imp->imp_unregistering, 0);
1019         cfs_atomic_set(&imp->imp_inflight, 0);
1020         cfs_atomic_set(&imp->imp_replay_inflight, 0);
1021         cfs_atomic_set(&imp->imp_inval_count, 0);
1022         CFS_INIT_LIST_HEAD(&imp->imp_conn_list);
1023         CFS_INIT_LIST_HEAD(&imp->imp_handle.h_link);
1024         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1025         init_imp_at(&imp->imp_at);
1026
1027         /* the default magic is V2, will be used in connect RPC, and
1028          * then adjusted according to the flags in request/reply. */
1029         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1030
1031         return imp;
1032 }
1033 EXPORT_SYMBOL(class_new_import);
1034
1035 void class_destroy_import(struct obd_import *import)
1036 {
1037         LASSERT(import != NULL);
1038         LASSERT(import != LP_POISON);
1039
1040         class_handle_unhash(&import->imp_handle);
1041
1042         cfs_spin_lock(&import->imp_lock);
1043         import->imp_generation++;
1044         cfs_spin_unlock(&import->imp_lock);
1045         class_import_put(import);
1046 }
1047 EXPORT_SYMBOL(class_destroy_import);
1048
1049 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1050
1051 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1052 {
1053         cfs_spin_lock(&exp->exp_locks_list_guard);
1054
1055         LASSERT(lock->l_exp_refs_nr >= 0);
1056
1057         if (lock->l_exp_refs_target != NULL &&
1058             lock->l_exp_refs_target != exp) {
1059                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1060                               exp, lock, lock->l_exp_refs_target);
1061         }
1062         if ((lock->l_exp_refs_nr ++) == 0) {
1063                 cfs_list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1064                 lock->l_exp_refs_target = exp;
1065         }
1066         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1067                lock, exp, lock->l_exp_refs_nr);
1068         cfs_spin_unlock(&exp->exp_locks_list_guard);
1069 }
1070 EXPORT_SYMBOL(__class_export_add_lock_ref);
1071
1072 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1073 {
1074         cfs_spin_lock(&exp->exp_locks_list_guard);
1075         LASSERT(lock->l_exp_refs_nr > 0);
1076         if (lock->l_exp_refs_target != exp) {
1077                 LCONSOLE_WARN("lock %p, "
1078                               "mismatching export pointers: %p, %p\n",
1079                               lock, lock->l_exp_refs_target, exp);
1080         }
1081         if (-- lock->l_exp_refs_nr == 0) {
1082                 cfs_list_del_init(&lock->l_exp_refs_link);
1083                 lock->l_exp_refs_target = NULL;
1084         }
1085         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1086                lock, exp, lock->l_exp_refs_nr);
1087         cfs_spin_unlock(&exp->exp_locks_list_guard);
1088 }
1089 EXPORT_SYMBOL(__class_export_del_lock_ref);
1090 #endif
1091
1092 /* A connection defines an export context in which preallocation can
1093    be managed. This releases the export pointer reference, and returns
1094    the export handle, so the export refcount is 1 when this function
1095    returns. */
1096 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1097                   struct obd_uuid *cluuid)
1098 {
1099         struct obd_export *export;
1100         LASSERT(conn != NULL);
1101         LASSERT(obd != NULL);
1102         LASSERT(cluuid != NULL);
1103         ENTRY;
1104
1105         export = class_new_export(obd, cluuid);
1106         if (IS_ERR(export))
1107                 RETURN(PTR_ERR(export));
1108
1109         conn->cookie = export->exp_handle.h_cookie;
1110         class_export_put(export);
1111
1112         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1113                cluuid->uuid, conn->cookie);
1114         RETURN(0);
1115 }
1116 EXPORT_SYMBOL(class_connect);
1117
1118 /* if export is involved in recovery then clean up related things */
1119 void class_export_recovery_cleanup(struct obd_export *exp)
1120 {
1121         struct obd_device *obd = exp->exp_obd;
1122
1123         cfs_spin_lock(&obd->obd_recovery_task_lock);
1124         if (exp->exp_delayed)
1125                 obd->obd_delayed_clients--;
1126         if (obd->obd_recovering && exp->exp_in_recovery) {
1127                 cfs_spin_lock(&exp->exp_lock);
1128                 exp->exp_in_recovery = 0;
1129                 cfs_spin_unlock(&exp->exp_lock);
1130                 LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1131                 cfs_atomic_dec(&obd->obd_connected_clients);
1132         }
1133         cfs_spin_unlock(&obd->obd_recovery_task_lock);
1134         /** Cleanup req replay fields */
1135         if (exp->exp_req_replay_needed) {
1136                 cfs_spin_lock(&exp->exp_lock);
1137                 exp->exp_req_replay_needed = 0;
1138                 cfs_spin_unlock(&exp->exp_lock);
1139                 LASSERT(cfs_atomic_read(&obd->obd_req_replay_clients));
1140                 cfs_atomic_dec(&obd->obd_req_replay_clients);
1141         }
1142         /** Cleanup lock replay data */
1143         if (exp->exp_lock_replay_needed) {
1144                 cfs_spin_lock(&exp->exp_lock);
1145                 exp->exp_lock_replay_needed = 0;
1146                 cfs_spin_unlock(&exp->exp_lock);
1147                 LASSERT(cfs_atomic_read(&obd->obd_lock_replay_clients));
1148                 cfs_atomic_dec(&obd->obd_lock_replay_clients);
1149         }
1150 }
1151
1152 /* This function removes 1-3 references from the export:
1153  * 1 - for export pointer passed
1154  * and if disconnect really need
1155  * 2 - removing from hash
1156  * 3 - in client_unlink_export
1157  * The export pointer passed to this function can destroyed */
1158 int class_disconnect(struct obd_export *export)
1159 {
1160         int already_disconnected;
1161         ENTRY;
1162
1163         if (export == NULL) {
1164                 CWARN("attempting to free NULL export %p\n", export);
1165                 RETURN(-EINVAL);
1166         }
1167
1168         cfs_spin_lock(&export->exp_lock);
1169         already_disconnected = export->exp_disconnected;
1170         export->exp_disconnected = 1;
1171         cfs_spin_unlock(&export->exp_lock);
1172
1173         /* class_cleanup(), abort_recovery(), and class_fail_export()
1174          * all end up in here, and if any of them race we shouldn't
1175          * call extra class_export_puts(). */
1176         if (already_disconnected) {
1177                 LASSERT(cfs_hlist_unhashed(&export->exp_nid_hash));
1178                 GOTO(no_disconn, already_disconnected);
1179         }
1180
1181         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1182                export->exp_handle.h_cookie);
1183
1184         if (!cfs_hlist_unhashed(&export->exp_nid_hash))
1185                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1186                              &export->exp_connection->c_peer.nid,
1187                              &export->exp_nid_hash);
1188
1189         class_export_recovery_cleanup(export);
1190         class_unlink_export(export);
1191 no_disconn:
1192         class_export_put(export);
1193         RETURN(0);
1194 }
1195 EXPORT_SYMBOL(class_disconnect);
1196
1197 /* Return non-zero for a fully connected export */
1198 int class_connected_export(struct obd_export *exp)
1199 {
1200         if (exp) {
1201                 int connected;
1202                 cfs_spin_lock(&exp->exp_lock);
1203                 connected = (exp->exp_conn_cnt > 0);
1204                 cfs_spin_unlock(&exp->exp_lock);
1205                 return connected;
1206         }
1207         return 0;
1208 }
1209 EXPORT_SYMBOL(class_connected_export);
1210
1211 static void class_disconnect_export_list(cfs_list_t *list,
1212                                          enum obd_option flags)
1213 {
1214         int rc;
1215         struct obd_export *exp;
1216         ENTRY;
1217
1218         /* It's possible that an export may disconnect itself, but
1219          * nothing else will be added to this list. */
1220         while (!cfs_list_empty(list)) {
1221                 exp = cfs_list_entry(list->next, struct obd_export,
1222                                      exp_obd_chain);
1223                 /* need for safe call CDEBUG after obd_disconnect */
1224                 class_export_get(exp);
1225
1226                 cfs_spin_lock(&exp->exp_lock);
1227                 exp->exp_flags = flags;
1228                 cfs_spin_unlock(&exp->exp_lock);
1229
1230                 if (obd_uuid_equals(&exp->exp_client_uuid,
1231                                     &exp->exp_obd->obd_uuid)) {
1232                         CDEBUG(D_HA,
1233                                "exp %p export uuid == obd uuid, don't discon\n",
1234                                exp);
1235                         /* Need to delete this now so we don't end up pointing
1236                          * to work_list later when this export is cleaned up. */
1237                         cfs_list_del_init(&exp->exp_obd_chain);
1238                         class_export_put(exp);
1239                         continue;
1240                 }
1241
1242                 class_export_get(exp);
1243                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1244                        "last request at "CFS_TIME_T"\n",
1245                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1246                        exp, exp->exp_last_request_time);
1247                 /* release one export reference anyway */
1248                 rc = obd_disconnect(exp);
1249
1250                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1251                        obd_export_nid2str(exp), exp, rc);
1252                 class_export_put(exp);
1253         }
1254         EXIT;
1255 }
1256
1257 void class_disconnect_exports(struct obd_device *obd)
1258 {
1259         cfs_list_t work_list;
1260         ENTRY;
1261
1262         /* Move all of the exports from obd_exports to a work list, en masse. */
1263         CFS_INIT_LIST_HEAD(&work_list);
1264         cfs_spin_lock(&obd->obd_dev_lock);
1265         cfs_list_splice_init(&obd->obd_exports, &work_list);
1266         cfs_list_splice_init(&obd->obd_delayed_exports, &work_list);
1267         cfs_spin_unlock(&obd->obd_dev_lock);
1268
1269         if (!cfs_list_empty(&work_list)) {
1270                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1271                        "disconnecting them\n", obd->obd_minor, obd);
1272                 class_disconnect_export_list(&work_list,
1273                                              exp_flags_from_obd(obd));
1274         } else
1275                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1276                        obd->obd_minor, obd);
1277         EXIT;
1278 }
1279 EXPORT_SYMBOL(class_disconnect_exports);
1280
1281 /* Remove exports that have not completed recovery.
1282  */
1283 void class_disconnect_stale_exports(struct obd_device *obd,
1284                                     int (*test_export)(struct obd_export *))
1285 {
1286         cfs_list_t work_list;
1287         struct obd_export *exp, *n;
1288         int evicted = 0;
1289         ENTRY;
1290
1291         CFS_INIT_LIST_HEAD(&work_list);
1292         cfs_spin_lock(&obd->obd_dev_lock);
1293         cfs_list_for_each_entry_safe(exp, n, &obd->obd_exports,
1294                                      exp_obd_chain) {
1295                 /* don't count self-export as client */
1296                 if (obd_uuid_equals(&exp->exp_client_uuid,
1297                                     &exp->exp_obd->obd_uuid))
1298                         continue;
1299
1300                 /* don't evict clients which have no slot in last_rcvd
1301                  * (e.g. lightweight connection) */
1302                 if (exp->exp_target_data.ted_lr_idx == -1)
1303                         continue;
1304
1305                 cfs_spin_lock(&exp->exp_lock);
1306                 if (test_export(exp)) {
1307                         cfs_spin_unlock(&exp->exp_lock);
1308                         continue;
1309                 }
1310                 exp->exp_failed = 1;
1311                 cfs_spin_unlock(&exp->exp_lock);
1312
1313                 cfs_list_move(&exp->exp_obd_chain, &work_list);
1314                 evicted++;
1315                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1316                        obd->obd_name, exp->exp_client_uuid.uuid,
1317                        exp->exp_connection == NULL ? "<unknown>" :
1318                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1319                 print_export_data(exp, "EVICTING", 0);
1320         }
1321         cfs_spin_unlock(&obd->obd_dev_lock);
1322
1323         if (evicted) {
1324                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1325                               obd->obd_name, evicted);
1326                 obd->obd_stale_clients += evicted;
1327         }
1328         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1329                                                  OBD_OPT_ABORT_RECOV);
1330         EXIT;
1331 }
1332 EXPORT_SYMBOL(class_disconnect_stale_exports);
1333
1334 void class_fail_export(struct obd_export *exp)
1335 {
1336         int rc, already_failed;
1337
1338         cfs_spin_lock(&exp->exp_lock);
1339         already_failed = exp->exp_failed;
1340         exp->exp_failed = 1;
1341         cfs_spin_unlock(&exp->exp_lock);
1342
1343         if (already_failed) {
1344                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1345                        exp, exp->exp_client_uuid.uuid);
1346                 return;
1347         }
1348
1349         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1350                exp, exp->exp_client_uuid.uuid);
1351
1352         if (obd_dump_on_timeout)
1353                 libcfs_debug_dumplog();
1354
1355         /* need for safe call CDEBUG after obd_disconnect */
1356         class_export_get(exp);
1357
1358         /* Most callers into obd_disconnect are removing their own reference
1359          * (request, for example) in addition to the one from the hash table.
1360          * We don't have such a reference here, so make one. */
1361         class_export_get(exp);
1362         rc = obd_disconnect(exp);
1363         if (rc)
1364                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1365         else
1366                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1367                        exp, exp->exp_client_uuid.uuid);
1368         class_export_put(exp);
1369 }
1370 EXPORT_SYMBOL(class_fail_export);
1371
1372 char *obd_export_nid2str(struct obd_export *exp)
1373 {
1374         if (exp->exp_connection != NULL)
1375                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1376
1377         return "(no nid)";
1378 }
1379 EXPORT_SYMBOL(obd_export_nid2str);
1380
1381 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1382 {
1383         struct obd_export *doomed_exp = NULL;
1384         int exports_evicted = 0;
1385
1386         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1387
1388         do {
1389                 doomed_exp = cfs_hash_lookup(obd->obd_nid_hash, &nid_key);
1390                 if (doomed_exp == NULL)
1391                         break;
1392
1393                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1394                          "nid %s found, wanted nid %s, requested nid %s\n",
1395                          obd_export_nid2str(doomed_exp),
1396                          libcfs_nid2str(nid_key), nid);
1397                 LASSERTF(doomed_exp != obd->obd_self_export,
1398                          "self-export is hashed by NID?\n");
1399                 exports_evicted++;
1400                 CWARN("%s: evict NID '%s' (%s) #%d at adminstrative request\n",
1401                        obd->obd_name, nid, doomed_exp->exp_client_uuid.uuid,
1402                        exports_evicted);
1403                 class_fail_export(doomed_exp);
1404                 class_export_put(doomed_exp);
1405         } while (1);
1406
1407         if (!exports_evicted)
1408                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1409                        obd->obd_name, nid);
1410         return exports_evicted;
1411 }
1412 EXPORT_SYMBOL(obd_export_evict_by_nid);
1413
1414 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1415 {
1416         struct obd_export *doomed_exp = NULL;
1417         struct obd_uuid doomed_uuid;
1418         int exports_evicted = 0;
1419
1420         obd_str2uuid(&doomed_uuid, uuid);
1421         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1422                 CERROR("%s: can't evict myself\n", obd->obd_name);
1423                 return exports_evicted;
1424         }
1425
1426         doomed_exp = cfs_hash_lookup(obd->obd_uuid_hash, &doomed_uuid);
1427
1428         if (doomed_exp == NULL) {
1429                 CERROR("%s: can't disconnect %s: no exports found\n",
1430                        obd->obd_name, uuid);
1431         } else {
1432                 CWARN("%s: evicting %s at adminstrative request\n",
1433                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1434                 class_fail_export(doomed_exp);
1435                 class_export_put(doomed_exp);
1436                 exports_evicted++;
1437         }
1438
1439         return exports_evicted;
1440 }
1441 EXPORT_SYMBOL(obd_export_evict_by_uuid);
1442
1443 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1444 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1445 EXPORT_SYMBOL(class_export_dump_hook);
1446 #endif
1447
1448 static void print_export_data(struct obd_export *exp, const char *status,
1449                               int locks)
1450 {
1451         struct ptlrpc_reply_state *rs;
1452         struct ptlrpc_reply_state *first_reply = NULL;
1453         int nreplies = 0;
1454
1455         cfs_spin_lock(&exp->exp_lock);
1456         cfs_list_for_each_entry(rs, &exp->exp_outstanding_replies,
1457                                 rs_exp_list) {
1458                 if (nreplies == 0)
1459                         first_reply = rs;
1460                 nreplies++;
1461         }
1462         cfs_spin_unlock(&exp->exp_lock);
1463
1464         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1465                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1466                obd_export_nid2str(exp), cfs_atomic_read(&exp->exp_refcount),
1467                cfs_atomic_read(&exp->exp_rpc_count),
1468                cfs_atomic_read(&exp->exp_cb_count),
1469                cfs_atomic_read(&exp->exp_locks_count),
1470                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1471                nreplies, first_reply, nreplies > 3 ? "..." : "",
1472                exp->exp_last_committed);
1473 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1474         if (locks && class_export_dump_hook != NULL)
1475                 class_export_dump_hook(exp);
1476 #endif
1477 }
1478
1479 void dump_exports(struct obd_device *obd, int locks)
1480 {
1481         struct obd_export *exp;
1482
1483         cfs_spin_lock(&obd->obd_dev_lock);
1484         cfs_list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1485                 print_export_data(exp, "ACTIVE", locks);
1486         cfs_list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1487                 print_export_data(exp, "UNLINKED", locks);
1488         cfs_list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1489                 print_export_data(exp, "DELAYED", locks);
1490         cfs_spin_unlock(&obd->obd_dev_lock);
1491         cfs_spin_lock(&obd_zombie_impexp_lock);
1492         cfs_list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1493                 print_export_data(exp, "ZOMBIE", locks);
1494         cfs_spin_unlock(&obd_zombie_impexp_lock);
1495 }
1496 EXPORT_SYMBOL(dump_exports);
1497
1498 void obd_exports_barrier(struct obd_device *obd)
1499 {
1500         int waited = 2;
1501         LASSERT(cfs_list_empty(&obd->obd_exports));
1502         cfs_spin_lock(&obd->obd_dev_lock);
1503         while (!cfs_list_empty(&obd->obd_unlinked_exports)) {
1504                 cfs_spin_unlock(&obd->obd_dev_lock);
1505                 cfs_schedule_timeout_and_set_state(CFS_TASK_UNINT,
1506                                                    cfs_time_seconds(waited));
1507                 if (waited > 5 && IS_PO2(waited)) {
1508                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1509                                       "more than %d seconds. "
1510                                       "The obd refcount = %d. Is it stuck?\n",
1511                                       obd->obd_name, waited,
1512                                       cfs_atomic_read(&obd->obd_refcount));
1513                         dump_exports(obd, 1);
1514                 }
1515                 waited *= 2;
1516                 cfs_spin_lock(&obd->obd_dev_lock);
1517         }
1518         cfs_spin_unlock(&obd->obd_dev_lock);
1519 }
1520 EXPORT_SYMBOL(obd_exports_barrier);
1521
1522 /* Total amount of zombies to be destroyed */
1523 static int zombies_count = 0;
1524
1525 /**
1526  * kill zombie imports and exports
1527  */
1528 void obd_zombie_impexp_cull(void)
1529 {
1530         struct obd_import *import;
1531         struct obd_export *export;
1532         ENTRY;
1533
1534         do {
1535                 cfs_spin_lock(&obd_zombie_impexp_lock);
1536
1537                 import = NULL;
1538                 if (!cfs_list_empty(&obd_zombie_imports)) {
1539                         import = cfs_list_entry(obd_zombie_imports.next,
1540                                                 struct obd_import,
1541                                                 imp_zombie_chain);
1542                         cfs_list_del_init(&import->imp_zombie_chain);
1543                 }
1544
1545                 export = NULL;
1546                 if (!cfs_list_empty(&obd_zombie_exports)) {
1547                         export = cfs_list_entry(obd_zombie_exports.next,
1548                                                 struct obd_export,
1549                                                 exp_obd_chain);
1550                         cfs_list_del_init(&export->exp_obd_chain);
1551                 }
1552
1553                 cfs_spin_unlock(&obd_zombie_impexp_lock);
1554
1555                 if (import != NULL) {
1556                         class_import_destroy(import);
1557                         cfs_spin_lock(&obd_zombie_impexp_lock);
1558                         zombies_count--;
1559                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1560                 }
1561
1562                 if (export != NULL) {
1563                         class_export_destroy(export);
1564                         cfs_spin_lock(&obd_zombie_impexp_lock);
1565                         zombies_count--;
1566                         cfs_spin_unlock(&obd_zombie_impexp_lock);
1567                 }
1568
1569                 cfs_cond_resched();
1570         } while (import != NULL || export != NULL);
1571         EXIT;
1572 }
1573
1574 static cfs_completion_t         obd_zombie_start;
1575 static cfs_completion_t         obd_zombie_stop;
1576 static unsigned long            obd_zombie_flags;
1577 static cfs_waitq_t              obd_zombie_waitq;
1578 static pid_t                    obd_zombie_pid;
1579
1580 enum {
1581         OBD_ZOMBIE_STOP   = 1 << 1
1582 };
1583
1584 /**
1585  * check for work for kill zombie import/export thread.
1586  */
1587 static int obd_zombie_impexp_check(void *arg)
1588 {
1589         int rc;
1590
1591         cfs_spin_lock(&obd_zombie_impexp_lock);
1592         rc = (zombies_count == 0) &&
1593              !cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1594         cfs_spin_unlock(&obd_zombie_impexp_lock);
1595
1596         RETURN(rc);
1597 }
1598
1599 /**
1600  * Add export to the obd_zombe thread and notify it.
1601  */
1602 static void obd_zombie_export_add(struct obd_export *exp) {
1603         cfs_spin_lock(&exp->exp_obd->obd_dev_lock);
1604         LASSERT(!cfs_list_empty(&exp->exp_obd_chain));
1605         cfs_list_del_init(&exp->exp_obd_chain);
1606         cfs_spin_unlock(&exp->exp_obd->obd_dev_lock);
1607         cfs_spin_lock(&obd_zombie_impexp_lock);
1608         zombies_count++;
1609         cfs_list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1610         cfs_spin_unlock(&obd_zombie_impexp_lock);
1611
1612         obd_zombie_impexp_notify();
1613 }
1614
1615 /**
1616  * Add import to the obd_zombe thread and notify it.
1617  */
1618 static void obd_zombie_import_add(struct obd_import *imp) {
1619         LASSERT(imp->imp_sec == NULL);
1620         LASSERT(imp->imp_rq_pool == NULL);
1621         cfs_spin_lock(&obd_zombie_impexp_lock);
1622         LASSERT(cfs_list_empty(&imp->imp_zombie_chain));
1623         zombies_count++;
1624         cfs_list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1625         cfs_spin_unlock(&obd_zombie_impexp_lock);
1626
1627         obd_zombie_impexp_notify();
1628 }
1629
1630 /**
1631  * notify import/export destroy thread about new zombie.
1632  */
1633 static void obd_zombie_impexp_notify(void)
1634 {
1635         /*
1636          * Make sure obd_zomebie_impexp_thread get this notification.
1637          * It is possible this signal only get by obd_zombie_barrier, and
1638          * barrier gulps this notification and sleeps away and hangs ensues
1639          */
1640         cfs_waitq_broadcast(&obd_zombie_waitq);
1641 }
1642
1643 /**
1644  * check whether obd_zombie is idle
1645  */
1646 static int obd_zombie_is_idle(void)
1647 {
1648         int rc;
1649
1650         LASSERT(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1651         cfs_spin_lock(&obd_zombie_impexp_lock);
1652         rc = (zombies_count == 0);
1653         cfs_spin_unlock(&obd_zombie_impexp_lock);
1654         return rc;
1655 }
1656
1657 /**
1658  * wait when obd_zombie import/export queues become empty
1659  */
1660 void obd_zombie_barrier(void)
1661 {
1662         struct l_wait_info lwi = { 0 };
1663
1664         if (obd_zombie_pid == cfs_curproc_pid())
1665                 /* don't wait for myself */
1666                 return;
1667         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1668 }
1669 EXPORT_SYMBOL(obd_zombie_barrier);
1670
1671 #ifdef __KERNEL__
1672
1673 /**
1674  * destroy zombie export/import thread.
1675  */
1676 static int obd_zombie_impexp_thread(void *unused)
1677 {
1678         int rc;
1679
1680         if ((rc = cfs_daemonize_ctxt("obd_zombid"))) {
1681                 cfs_complete(&obd_zombie_start);
1682                 RETURN(rc);
1683         }
1684
1685         cfs_complete(&obd_zombie_start);
1686
1687         obd_zombie_pid = cfs_curproc_pid();
1688
1689         while(!cfs_test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1690                 struct l_wait_info lwi = { 0 };
1691
1692                 l_wait_event(obd_zombie_waitq,
1693                              !obd_zombie_impexp_check(NULL), &lwi);
1694                 obd_zombie_impexp_cull();
1695
1696                 /*
1697                  * Notify obd_zombie_barrier callers that queues
1698                  * may be empty.
1699                  */
1700                 cfs_waitq_signal(&obd_zombie_waitq);
1701         }
1702
1703         cfs_complete(&obd_zombie_stop);
1704
1705         RETURN(0);
1706 }
1707
1708 #else /* ! KERNEL */
1709
1710 static cfs_atomic_t zombie_recur = CFS_ATOMIC_INIT(0);
1711 static void *obd_zombie_impexp_work_cb;
1712 static void *obd_zombie_impexp_idle_cb;
1713
1714 int obd_zombie_impexp_kill(void *arg)
1715 {
1716         int rc = 0;
1717
1718         if (cfs_atomic_inc_return(&zombie_recur) == 1) {
1719                 obd_zombie_impexp_cull();
1720                 rc = 1;
1721         }
1722         cfs_atomic_dec(&zombie_recur);
1723         return rc;
1724 }
1725
1726 #endif
1727
1728 /**
1729  * start destroy zombie import/export thread
1730  */
1731 int obd_zombie_impexp_init(void)
1732 {
1733         int rc;
1734
1735         CFS_INIT_LIST_HEAD(&obd_zombie_imports);
1736         CFS_INIT_LIST_HEAD(&obd_zombie_exports);
1737         cfs_spin_lock_init(&obd_zombie_impexp_lock);
1738         cfs_init_completion(&obd_zombie_start);
1739         cfs_init_completion(&obd_zombie_stop);
1740         cfs_waitq_init(&obd_zombie_waitq);
1741         obd_zombie_pid = 0;
1742
1743 #ifdef __KERNEL__
1744         rc = cfs_create_thread(obd_zombie_impexp_thread, NULL, 0);
1745         if (rc < 0)
1746                 RETURN(rc);
1747
1748         cfs_wait_for_completion(&obd_zombie_start);
1749 #else
1750
1751         obd_zombie_impexp_work_cb =
1752                 liblustre_register_wait_callback("obd_zombi_impexp_kill",
1753                                                  &obd_zombie_impexp_kill, NULL);
1754
1755         obd_zombie_impexp_idle_cb =
1756                 liblustre_register_idle_callback("obd_zombi_impexp_check",
1757                                                  &obd_zombie_impexp_check, NULL);
1758         rc = 0;
1759 #endif
1760         RETURN(rc);
1761 }
1762 /**
1763  * stop destroy zombie import/export thread
1764  */
1765 void obd_zombie_impexp_stop(void)
1766 {
1767         cfs_set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1768         obd_zombie_impexp_notify();
1769 #ifdef __KERNEL__
1770         cfs_wait_for_completion(&obd_zombie_stop);
1771 #else
1772         liblustre_deregister_wait_callback(obd_zombie_impexp_work_cb);
1773         liblustre_deregister_idle_callback(obd_zombie_impexp_idle_cb);
1774 #endif
1775 }
1776
1777 /***** Kernel-userspace comm helpers *******/
1778
1779 /* Get length of entire message, including header */
1780 int kuc_len(int payload_len)
1781 {
1782         return sizeof(struct kuc_hdr) + payload_len;
1783 }
1784 EXPORT_SYMBOL(kuc_len);
1785
1786 /* Get a pointer to kuc header, given a ptr to the payload
1787  * @param p Pointer to payload area
1788  * @returns Pointer to kuc header
1789  */
1790 struct kuc_hdr * kuc_ptr(void *p)
1791 {
1792         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1793         LASSERT(lh->kuc_magic == KUC_MAGIC);
1794         return lh;
1795 }
1796 EXPORT_SYMBOL(kuc_ptr);
1797
1798 /* Test if payload is part of kuc message
1799  * @param p Pointer to payload area
1800  * @returns boolean
1801  */
1802 int kuc_ispayload(void *p)
1803 {
1804         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1805
1806         if (kh->kuc_magic == KUC_MAGIC)
1807                 return 1;
1808         else
1809                 return 0;
1810 }
1811 EXPORT_SYMBOL(kuc_ispayload);
1812
1813 /* Alloc space for a message, and fill in header
1814  * @return Pointer to payload area
1815  */
1816 void *kuc_alloc(int payload_len, int transport, int type)
1817 {
1818         struct kuc_hdr *lh;
1819         int len = kuc_len(payload_len);
1820
1821         OBD_ALLOC(lh, len);
1822         if (lh == NULL)
1823                 return ERR_PTR(-ENOMEM);
1824
1825         lh->kuc_magic = KUC_MAGIC;
1826         lh->kuc_transport = transport;
1827         lh->kuc_msgtype = type;
1828         lh->kuc_msglen = len;
1829
1830         return (void *)(lh + 1);
1831 }
1832 EXPORT_SYMBOL(kuc_alloc);
1833
1834 /* Takes pointer to payload area */
1835 inline void kuc_free(void *p, int payload_len)
1836 {
1837         struct kuc_hdr *lh = kuc_ptr(p);
1838         OBD_FREE(lh, kuc_len(payload_len));
1839 }
1840 EXPORT_SYMBOL(kuc_free);
1841
1842
1843