Whamcloud - gitweb
54edf1d02d2f09d8d416337a39d69affd32f2f5a
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lprocfs_status.h>
44 #include <lustre_disk.h>
45 #include <lustre_kernelcomm.h>
46
47 spinlock_t obd_types_lock;
48
49 static struct kmem_cache *obd_device_cachep;
50 struct kmem_cache *obdo_cachep;
51 EXPORT_SYMBOL(obdo_cachep);
52 static struct kmem_cache *import_cachep;
53
54 static struct list_head obd_zombie_imports;
55 static struct list_head obd_zombie_exports;
56 static spinlock_t  obd_zombie_impexp_lock;
57
58 static void obd_zombie_impexp_notify(void);
59 static void obd_zombie_export_add(struct obd_export *exp);
60 static void obd_zombie_import_add(struct obd_import *imp);
61 static void print_export_data(struct obd_export *exp,
62                               const char *status, int locks, int debug_level);
63
64 struct list_head obd_stale_exports;
65 spinlock_t       obd_stale_export_lock;
66 atomic_t         obd_stale_export_num;
67
68 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
69 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
70
71 /*
72  * support functions: we could use inter-module communication, but this
73  * is more portable to other OS's
74  */
75 static struct obd_device *obd_device_alloc(void)
76 {
77         struct obd_device *obd;
78
79         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
80         if (obd != NULL) {
81                 obd->obd_magic = OBD_DEVICE_MAGIC;
82         }
83         return obd;
84 }
85
86 static void obd_device_free(struct obd_device *obd)
87 {
88         LASSERT(obd != NULL);
89         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
90                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
91         if (obd->obd_namespace != NULL) {
92                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
93                        obd, obd->obd_namespace, obd->obd_force);
94                 LBUG();
95         }
96         lu_ref_fini(&obd->obd_reference);
97         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 }
99
100 struct obd_type *class_search_type(const char *name)
101 {
102         struct list_head *tmp;
103         struct obd_type *type;
104
105         spin_lock(&obd_types_lock);
106         list_for_each(tmp, &obd_types) {
107                 type = list_entry(tmp, struct obd_type, typ_chain);
108                 if (strcmp(type->typ_name, name) == 0) {
109                         spin_unlock(&obd_types_lock);
110                         return type;
111                 }
112         }
113         spin_unlock(&obd_types_lock);
114         return NULL;
115 }
116 EXPORT_SYMBOL(class_search_type);
117
118 struct obd_type *class_get_type(const char *name)
119 {
120         struct obd_type *type = class_search_type(name);
121
122 #ifdef HAVE_MODULE_LOADING_SUPPORT
123         if (!type) {
124                 const char *modname = name;
125
126                 if (strcmp(modname, "obdfilter") == 0)
127                         modname = "ofd";
128
129                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
130                         modname = LUSTRE_OSP_NAME;
131
132                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
133                         modname = LUSTRE_MDT_NAME;
134
135                 if (!request_module("%s", modname)) {
136                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
137                         type = class_search_type(name);
138                 } else {
139                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
140                                            modname);
141                 }
142         }
143 #endif
144         if (type) {
145                 spin_lock(&type->obd_type_lock);
146                 type->typ_refcnt++;
147                 try_module_get(type->typ_dt_ops->o_owner);
148                 spin_unlock(&type->obd_type_lock);
149         }
150         return type;
151 }
152
153 void class_put_type(struct obd_type *type)
154 {
155         LASSERT(type);
156         spin_lock(&type->obd_type_lock);
157         type->typ_refcnt--;
158         module_put(type->typ_dt_ops->o_owner);
159         spin_unlock(&type->obd_type_lock);
160 }
161
162 #define CLASS_MAX_NAME 1024
163
164 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
165                         bool enable_proc, struct lprocfs_vars *vars,
166                         const char *name, struct lu_device_type *ldt)
167 {
168         struct obd_type *type;
169         int rc = 0;
170         ENTRY;
171
172         /* sanity check */
173         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
174
175         if (class_search_type(name)) {
176                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
177                 RETURN(-EEXIST);
178         }
179
180         rc = -ENOMEM;
181         OBD_ALLOC(type, sizeof(*type));
182         if (type == NULL)
183                 RETURN(rc);
184
185         OBD_ALLOC_PTR(type->typ_dt_ops);
186         OBD_ALLOC_PTR(type->typ_md_ops);
187         OBD_ALLOC(type->typ_name, strlen(name) + 1);
188
189         if (type->typ_dt_ops == NULL ||
190             type->typ_md_ops == NULL ||
191             type->typ_name == NULL)
192                 GOTO (failed, rc);
193
194         *(type->typ_dt_ops) = *dt_ops;
195         /* md_ops is optional */
196         if (md_ops)
197                 *(type->typ_md_ops) = *md_ops;
198         strcpy(type->typ_name, name);
199         spin_lock_init(&type->obd_type_lock);
200
201 #ifdef CONFIG_PROC_FS
202         if (enable_proc) {
203                 type->typ_procroot = lprocfs_register(type->typ_name,
204                                                       proc_lustre_root,
205                                                       vars, type);
206                 if (IS_ERR(type->typ_procroot)) {
207                         rc = PTR_ERR(type->typ_procroot);
208                         type->typ_procroot = NULL;
209                         GOTO(failed, rc);
210                 }
211         }
212 #endif
213         if (ldt != NULL) {
214                 type->typ_lu = ldt;
215                 rc = lu_device_type_init(ldt);
216                 if (rc != 0)
217                         GOTO (failed, rc);
218         }
219
220         spin_lock(&obd_types_lock);
221         list_add(&type->typ_chain, &obd_types);
222         spin_unlock(&obd_types_lock);
223
224         RETURN (0);
225
226 failed:
227         if (type->typ_name != NULL) {
228 #ifdef CONFIG_PROC_FS
229                 if (type->typ_procroot != NULL)
230                         remove_proc_subtree(type->typ_name, proc_lustre_root);
231 #endif
232                 OBD_FREE(type->typ_name, strlen(name) + 1);
233         }
234         if (type->typ_md_ops != NULL)
235                 OBD_FREE_PTR(type->typ_md_ops);
236         if (type->typ_dt_ops != NULL)
237                 OBD_FREE_PTR(type->typ_dt_ops);
238         OBD_FREE(type, sizeof(*type));
239         RETURN(rc);
240 }
241 EXPORT_SYMBOL(class_register_type);
242
243 int class_unregister_type(const char *name)
244 {
245         struct obd_type *type = class_search_type(name);
246         ENTRY;
247
248         if (!type) {
249                 CERROR("unknown obd type\n");
250                 RETURN(-EINVAL);
251         }
252
253         if (type->typ_refcnt) {
254                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
255                 /* This is a bad situation, let's make the best of it */
256                 /* Remove ops, but leave the name for debugging */
257                 OBD_FREE_PTR(type->typ_dt_ops);
258                 OBD_FREE_PTR(type->typ_md_ops);
259                 RETURN(-EBUSY);
260         }
261
262         /* we do not use type->typ_procroot as for compatibility purposes
263          * other modules can share names (i.e. lod can use lov entry). so
264          * we can't reference pointer as it can get invalided when another
265          * module removes the entry */
266 #ifdef CONFIG_PROC_FS
267         if (type->typ_procroot != NULL)
268                 remove_proc_subtree(type->typ_name, proc_lustre_root);
269         if (type->typ_procsym != NULL)
270                 lprocfs_remove(&type->typ_procsym);
271 #endif
272         if (type->typ_lu)
273                 lu_device_type_fini(type->typ_lu);
274
275         spin_lock(&obd_types_lock);
276         list_del(&type->typ_chain);
277         spin_unlock(&obd_types_lock);
278         OBD_FREE(type->typ_name, strlen(name) + 1);
279         if (type->typ_dt_ops != NULL)
280                 OBD_FREE_PTR(type->typ_dt_ops);
281         if (type->typ_md_ops != NULL)
282                 OBD_FREE_PTR(type->typ_md_ops);
283         OBD_FREE(type, sizeof(*type));
284         RETURN(0);
285 } /* class_unregister_type */
286 EXPORT_SYMBOL(class_unregister_type);
287
288 /**
289  * Create a new obd device.
290  *
291  * Find an empty slot in ::obd_devs[], create a new obd device in it.
292  *
293  * \param[in] type_name obd device type string.
294  * \param[in] name      obd device name.
295  *
296  * \retval NULL if create fails, otherwise return the obd device
297  *         pointer created.
298  */
299 struct obd_device *class_newdev(const char *type_name, const char *name)
300 {
301         struct obd_device *result = NULL;
302         struct obd_device *newdev;
303         struct obd_type *type = NULL;
304         int i;
305         int new_obd_minor = 0;
306         ENTRY;
307
308         if (strlen(name) >= MAX_OBD_NAME) {
309                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
310                 RETURN(ERR_PTR(-EINVAL));
311         }
312
313         type = class_get_type(type_name);
314         if (type == NULL){
315                 CERROR("OBD: unknown type: %s\n", type_name);
316                 RETURN(ERR_PTR(-ENODEV));
317         }
318
319         newdev = obd_device_alloc();
320         if (newdev == NULL)
321                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
322
323         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
324
325         write_lock(&obd_dev_lock);
326         for (i = 0; i < class_devno_max(); i++) {
327                 struct obd_device *obd = class_num2obd(i);
328
329                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
330                         CERROR("Device %s already exists at %d, won't add\n",
331                                name, i);
332                         if (result) {
333                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
334                                          "%p obd_magic %08x != %08x\n", result,
335                                          result->obd_magic, OBD_DEVICE_MAGIC);
336                                 LASSERTF(result->obd_minor == new_obd_minor,
337                                          "%p obd_minor %d != %d\n", result,
338                                          result->obd_minor, new_obd_minor);
339
340                                 obd_devs[result->obd_minor] = NULL;
341                                 result->obd_name[0]='\0';
342                          }
343                         result = ERR_PTR(-EEXIST);
344                         break;
345                 }
346                 if (!result && !obd) {
347                         result = newdev;
348                         result->obd_minor = i;
349                         new_obd_minor = i;
350                         result->obd_type = type;
351                         strncpy(result->obd_name, name,
352                                 sizeof(result->obd_name) - 1);
353                         obd_devs[i] = result;
354                 }
355         }
356         write_unlock(&obd_dev_lock);
357
358         if (result == NULL && i >= class_devno_max()) {
359                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
360                        class_devno_max());
361                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
362         }
363
364         if (IS_ERR(result))
365                 GOTO(out, result);
366
367         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
368                result->obd_name, result);
369
370         RETURN(result);
371 out:
372         obd_device_free(newdev);
373 out_type:
374         class_put_type(type);
375         return result;
376 }
377
378 void class_release_dev(struct obd_device *obd)
379 {
380         struct obd_type *obd_type = obd->obd_type;
381
382         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
383                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
384         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
385                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
386         LASSERT(obd_type != NULL);
387
388         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
389                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
390
391         write_lock(&obd_dev_lock);
392         obd_devs[obd->obd_minor] = NULL;
393         write_unlock(&obd_dev_lock);
394         obd_device_free(obd);
395
396         class_put_type(obd_type);
397 }
398
399 int class_name2dev(const char *name)
400 {
401         int i;
402
403         if (!name)
404                 return -1;
405
406         read_lock(&obd_dev_lock);
407         for (i = 0; i < class_devno_max(); i++) {
408                 struct obd_device *obd = class_num2obd(i);
409
410                 if (obd && strcmp(name, obd->obd_name) == 0) {
411                         /* Make sure we finished attaching before we give
412                            out any references */
413                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
414                         if (obd->obd_attached) {
415                                 read_unlock(&obd_dev_lock);
416                                 return i;
417                         }
418                         break;
419                 }
420         }
421         read_unlock(&obd_dev_lock);
422
423         return -1;
424 }
425
426 struct obd_device *class_name2obd(const char *name)
427 {
428         int dev = class_name2dev(name);
429
430         if (dev < 0 || dev > class_devno_max())
431                 return NULL;
432         return class_num2obd(dev);
433 }
434 EXPORT_SYMBOL(class_name2obd);
435
436 int class_uuid2dev(struct obd_uuid *uuid)
437 {
438         int i;
439
440         read_lock(&obd_dev_lock);
441         for (i = 0; i < class_devno_max(); i++) {
442                 struct obd_device *obd = class_num2obd(i);
443
444                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
445                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
446                         read_unlock(&obd_dev_lock);
447                         return i;
448                 }
449         }
450         read_unlock(&obd_dev_lock);
451
452         return -1;
453 }
454
455 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
456 {
457         int dev = class_uuid2dev(uuid);
458         if (dev < 0)
459                 return NULL;
460         return class_num2obd(dev);
461 }
462 EXPORT_SYMBOL(class_uuid2obd);
463
464 /**
465  * Get obd device from ::obd_devs[]
466  *
467  * \param num [in] array index
468  *
469  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
470  *         otherwise return the obd device there.
471  */
472 struct obd_device *class_num2obd(int num)
473 {
474         struct obd_device *obd = NULL;
475
476         if (num < class_devno_max()) {
477                 obd = obd_devs[num];
478                 if (obd == NULL)
479                         return NULL;
480
481                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
482                          "%p obd_magic %08x != %08x\n",
483                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
484                 LASSERTF(obd->obd_minor == num,
485                          "%p obd_minor %0d != %0d\n",
486                          obd, obd->obd_minor, num);
487         }
488
489         return obd;
490 }
491
492 /**
493  * Get obd devices count. Device in any
494  *    state are counted
495  * \retval obd device count
496  */
497 int get_devices_count(void)
498 {
499         int index, max_index = class_devno_max(), dev_count = 0;
500
501         read_lock(&obd_dev_lock);
502         for (index = 0; index <= max_index; index++) {
503                 struct obd_device *obd = class_num2obd(index);
504                 if (obd != NULL)
505                         dev_count++;
506         }
507         read_unlock(&obd_dev_lock);
508
509         return dev_count;
510 }
511 EXPORT_SYMBOL(get_devices_count);
512
513 void class_obd_list(void)
514 {
515         char *status;
516         int i;
517
518         read_lock(&obd_dev_lock);
519         for (i = 0; i < class_devno_max(); i++) {
520                 struct obd_device *obd = class_num2obd(i);
521
522                 if (obd == NULL)
523                         continue;
524                 if (obd->obd_stopping)
525                         status = "ST";
526                 else if (obd->obd_set_up)
527                         status = "UP";
528                 else if (obd->obd_attached)
529                         status = "AT";
530                 else
531                         status = "--";
532                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
533                          i, status, obd->obd_type->typ_name,
534                          obd->obd_name, obd->obd_uuid.uuid,
535                          atomic_read(&obd->obd_refcount));
536         }
537         read_unlock(&obd_dev_lock);
538         return;
539 }
540
541 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
542    specified, then only the client with that uuid is returned,
543    otherwise any client connected to the tgt is returned. */
544 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
545                                           const char * typ_name,
546                                           struct obd_uuid *grp_uuid)
547 {
548         int i;
549
550         read_lock(&obd_dev_lock);
551         for (i = 0; i < class_devno_max(); i++) {
552                 struct obd_device *obd = class_num2obd(i);
553
554                 if (obd == NULL)
555                         continue;
556                 if ((strncmp(obd->obd_type->typ_name, typ_name,
557                              strlen(typ_name)) == 0)) {
558                         if (obd_uuid_equals(tgt_uuid,
559                                             &obd->u.cli.cl_target_uuid) &&
560                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
561                                                          &obd->obd_uuid) : 1)) {
562                                 read_unlock(&obd_dev_lock);
563                                 return obd;
564                         }
565                 }
566         }
567         read_unlock(&obd_dev_lock);
568
569         return NULL;
570 }
571 EXPORT_SYMBOL(class_find_client_obd);
572
573 /* Iterate the obd_device list looking devices have grp_uuid. Start
574    searching at *next, and if a device is found, the next index to look
575    at is saved in *next. If next is NULL, then the first matching device
576    will always be returned. */
577 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
578 {
579         int i;
580
581         if (next == NULL)
582                 i = 0;
583         else if (*next >= 0 && *next < class_devno_max())
584                 i = *next;
585         else
586                 return NULL;
587
588         read_lock(&obd_dev_lock);
589         for (; i < class_devno_max(); i++) {
590                 struct obd_device *obd = class_num2obd(i);
591
592                 if (obd == NULL)
593                         continue;
594                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
595                         if (next != NULL)
596                                 *next = i+1;
597                         read_unlock(&obd_dev_lock);
598                         return obd;
599                 }
600         }
601         read_unlock(&obd_dev_lock);
602
603         return NULL;
604 }
605 EXPORT_SYMBOL(class_devices_in_group);
606
607 /**
608  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
609  * adjust sptlrpc settings accordingly.
610  */
611 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
612 {
613         struct obd_device  *obd;
614         const char         *type;
615         int                 i, rc = 0, rc2;
616
617         LASSERT(namelen > 0);
618
619         read_lock(&obd_dev_lock);
620         for (i = 0; i < class_devno_max(); i++) {
621                 obd = class_num2obd(i);
622
623                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
624                         continue;
625
626                 /* only notify mdc, osc, osp, lwp, mdt, ost
627                  * because only these have a -sptlrpc llog */
628                 type = obd->obd_type->typ_name;
629                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
630                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
631                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
632                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
633                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
634                     strcmp(type, LUSTRE_OST_NAME) != 0)
635                         continue;
636
637                 if (strncmp(obd->obd_name, fsname, namelen))
638                         continue;
639
640                 class_incref(obd, __FUNCTION__, obd);
641                 read_unlock(&obd_dev_lock);
642                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
643                                          sizeof(KEY_SPTLRPC_CONF),
644                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
645                 rc = rc ? rc : rc2;
646                 class_decref(obd, __FUNCTION__, obd);
647                 read_lock(&obd_dev_lock);
648         }
649         read_unlock(&obd_dev_lock);
650         return rc;
651 }
652 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
653
654 void obd_cleanup_caches(void)
655 {
656         ENTRY;
657         if (obd_device_cachep) {
658                 kmem_cache_destroy(obd_device_cachep);
659                 obd_device_cachep = NULL;
660         }
661         if (obdo_cachep) {
662                 kmem_cache_destroy(obdo_cachep);
663                 obdo_cachep = NULL;
664         }
665         if (import_cachep) {
666                 kmem_cache_destroy(import_cachep);
667                 import_cachep = NULL;
668         }
669
670         EXIT;
671 }
672
673 int obd_init_caches(void)
674 {
675         int rc;
676         ENTRY;
677
678         LASSERT(obd_device_cachep == NULL);
679         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
680                                               sizeof(struct obd_device),
681                                               0, 0, NULL);
682         if (!obd_device_cachep)
683                 GOTO(out, rc = -ENOMEM);
684
685         LASSERT(obdo_cachep == NULL);
686         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
687                                         0, 0, NULL);
688         if (!obdo_cachep)
689                 GOTO(out, rc = -ENOMEM);
690
691         LASSERT(import_cachep == NULL);
692         import_cachep = kmem_cache_create("ll_import_cache",
693                                           sizeof(struct obd_import),
694                                           0, 0, NULL);
695         if (!import_cachep)
696                 GOTO(out, rc = -ENOMEM);
697
698         RETURN(0);
699 out:
700         obd_cleanup_caches();
701         RETURN(rc);
702 }
703
704 /* map connection to client */
705 struct obd_export *class_conn2export(struct lustre_handle *conn)
706 {
707         struct obd_export *export;
708         ENTRY;
709
710         if (!conn) {
711                 CDEBUG(D_CACHE, "looking for null handle\n");
712                 RETURN(NULL);
713         }
714
715         if (conn->cookie == -1) {  /* this means assign a new connection */
716                 CDEBUG(D_CACHE, "want a new connection\n");
717                 RETURN(NULL);
718         }
719
720         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
721         export = class_handle2object(conn->cookie, NULL);
722         RETURN(export);
723 }
724 EXPORT_SYMBOL(class_conn2export);
725
726 struct obd_device *class_exp2obd(struct obd_export *exp)
727 {
728         if (exp)
729                 return exp->exp_obd;
730         return NULL;
731 }
732 EXPORT_SYMBOL(class_exp2obd);
733
734 struct obd_device *class_conn2obd(struct lustre_handle *conn)
735 {
736         struct obd_export *export;
737         export = class_conn2export(conn);
738         if (export) {
739                 struct obd_device *obd = export->exp_obd;
740                 class_export_put(export);
741                 return obd;
742         }
743         return NULL;
744 }
745
746 struct obd_import *class_exp2cliimp(struct obd_export *exp)
747 {
748         struct obd_device *obd = exp->exp_obd;
749         if (obd == NULL)
750                 return NULL;
751         return obd->u.cli.cl_import;
752 }
753 EXPORT_SYMBOL(class_exp2cliimp);
754
755 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
756 {
757         struct obd_device *obd = class_conn2obd(conn);
758         if (obd == NULL)
759                 return NULL;
760         return obd->u.cli.cl_import;
761 }
762
763 /* Export management functions */
764 static void class_export_destroy(struct obd_export *exp)
765 {
766         struct obd_device *obd = exp->exp_obd;
767         ENTRY;
768
769         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
770         LASSERT(obd != NULL);
771
772         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
773                exp->exp_client_uuid.uuid, obd->obd_name);
774
775         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
776         if (exp->exp_connection)
777                 ptlrpc_put_connection_superhack(exp->exp_connection);
778
779         LASSERT(list_empty(&exp->exp_outstanding_replies));
780         LASSERT(list_empty(&exp->exp_uncommitted_replies));
781         LASSERT(list_empty(&exp->exp_req_replay_queue));
782         LASSERT(list_empty(&exp->exp_hp_rpcs));
783         obd_destroy_export(exp);
784         class_decref(obd, "export", exp);
785
786         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
787         EXIT;
788 }
789
790 static void export_handle_addref(void *export)
791 {
792         class_export_get(export);
793 }
794
795 static struct portals_handle_ops export_handle_ops = {
796         .hop_addref = export_handle_addref,
797         .hop_free   = NULL,
798 };
799
800 struct obd_export *class_export_get(struct obd_export *exp)
801 {
802         atomic_inc(&exp->exp_refcount);
803         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
804                atomic_read(&exp->exp_refcount));
805         return exp;
806 }
807 EXPORT_SYMBOL(class_export_get);
808
809 void class_export_put(struct obd_export *exp)
810 {
811         LASSERT(exp != NULL);
812         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
813         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
814                atomic_read(&exp->exp_refcount) - 1);
815
816         if (atomic_dec_and_test(&exp->exp_refcount)) {
817                 LASSERT(!list_empty(&exp->exp_obd_chain));
818                 LASSERT(list_empty(&exp->exp_stale_list));
819                 CDEBUG(D_IOCTL, "final put %p/%s\n",
820                        exp, exp->exp_client_uuid.uuid);
821
822                 /* release nid stat refererence */
823                 lprocfs_exp_cleanup(exp);
824
825                 obd_zombie_export_add(exp);
826         }
827 }
828 EXPORT_SYMBOL(class_export_put);
829
830 /* Creates a new export, adds it to the hash table, and returns a
831  * pointer to it. The refcount is 2: one for the hash reference, and
832  * one for the pointer returned by this function. */
833 struct obd_export *class_new_export(struct obd_device *obd,
834                                     struct obd_uuid *cluuid)
835 {
836         struct obd_export *export;
837         struct cfs_hash *hash = NULL;
838         int rc = 0;
839         ENTRY;
840
841         OBD_ALLOC_PTR(export);
842         if (!export)
843                 return ERR_PTR(-ENOMEM);
844
845         export->exp_conn_cnt = 0;
846         export->exp_lock_hash = NULL;
847         export->exp_flock_hash = NULL;
848         atomic_set(&export->exp_refcount, 2);
849         atomic_set(&export->exp_rpc_count, 0);
850         atomic_set(&export->exp_cb_count, 0);
851         atomic_set(&export->exp_locks_count, 0);
852 #if LUSTRE_TRACKS_LOCK_EXP_REFS
853         INIT_LIST_HEAD(&export->exp_locks_list);
854         spin_lock_init(&export->exp_locks_list_guard);
855 #endif
856         atomic_set(&export->exp_replay_count, 0);
857         export->exp_obd = obd;
858         INIT_LIST_HEAD(&export->exp_outstanding_replies);
859         spin_lock_init(&export->exp_uncommitted_replies_lock);
860         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
861         INIT_LIST_HEAD(&export->exp_req_replay_queue);
862         INIT_LIST_HEAD(&export->exp_handle.h_link);
863         INIT_LIST_HEAD(&export->exp_hp_rpcs);
864         INIT_LIST_HEAD(&export->exp_reg_rpcs);
865         class_handle_hash(&export->exp_handle, &export_handle_ops);
866         export->exp_last_request_time = cfs_time_current_sec();
867         spin_lock_init(&export->exp_lock);
868         spin_lock_init(&export->exp_rpc_lock);
869         INIT_HLIST_NODE(&export->exp_uuid_hash);
870         INIT_HLIST_NODE(&export->exp_nid_hash);
871         INIT_HLIST_NODE(&export->exp_gen_hash);
872         spin_lock_init(&export->exp_bl_list_lock);
873         INIT_LIST_HEAD(&export->exp_bl_list);
874         INIT_LIST_HEAD(&export->exp_stale_list);
875
876         export->exp_sp_peer = LUSTRE_SP_ANY;
877         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
878         export->exp_client_uuid = *cluuid;
879         obd_init_export(export);
880
881         spin_lock(&obd->obd_dev_lock);
882         /* shouldn't happen, but might race */
883         if (obd->obd_stopping)
884                 GOTO(exit_unlock, rc = -ENODEV);
885
886         hash = cfs_hash_getref(obd->obd_uuid_hash);
887         if (hash == NULL)
888                 GOTO(exit_unlock, rc = -ENODEV);
889         spin_unlock(&obd->obd_dev_lock);
890
891         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
892                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
893                 if (rc != 0) {
894                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
895                                       obd->obd_name, cluuid->uuid, rc);
896                         GOTO(exit_err, rc = -EALREADY);
897                 }
898         }
899
900         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
901         spin_lock(&obd->obd_dev_lock);
902         if (obd->obd_stopping) {
903                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
904                 GOTO(exit_unlock, rc = -ENODEV);
905         }
906
907         class_incref(obd, "export", export);
908         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
909         list_add_tail(&export->exp_obd_chain_timed,
910                       &export->exp_obd->obd_exports_timed);
911         export->exp_obd->obd_num_exports++;
912         spin_unlock(&obd->obd_dev_lock);
913         cfs_hash_putref(hash);
914         RETURN(export);
915
916 exit_unlock:
917         spin_unlock(&obd->obd_dev_lock);
918 exit_err:
919         if (hash)
920                 cfs_hash_putref(hash);
921         class_handle_unhash(&export->exp_handle);
922         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
923         obd_destroy_export(export);
924         OBD_FREE_PTR(export);
925         return ERR_PTR(rc);
926 }
927 EXPORT_SYMBOL(class_new_export);
928
929 void class_unlink_export(struct obd_export *exp)
930 {
931         class_handle_unhash(&exp->exp_handle);
932
933         spin_lock(&exp->exp_obd->obd_dev_lock);
934         /* delete an uuid-export hashitem from hashtables */
935         if (!hlist_unhashed(&exp->exp_uuid_hash))
936                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
937                              &exp->exp_client_uuid,
938                              &exp->exp_uuid_hash);
939
940         if (!hlist_unhashed(&exp->exp_gen_hash)) {
941                 struct tg_export_data   *ted = &exp->exp_target_data;
942                 struct cfs_hash         *hash;
943
944                 /* Because obd_gen_hash will not be released until
945                  * class_cleanup(), so hash should never be NULL here */
946                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
947                 LASSERT(hash != NULL);
948                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
949                              &exp->exp_gen_hash);
950                 cfs_hash_putref(hash);
951         }
952
953         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
954         list_del_init(&exp->exp_obd_chain_timed);
955         exp->exp_obd->obd_num_exports--;
956         spin_unlock(&exp->exp_obd->obd_dev_lock);
957         atomic_inc(&obd_stale_export_num);
958
959         /* A reference is kept by obd_stale_exports list */
960         obd_stale_export_put(exp);
961 }
962 EXPORT_SYMBOL(class_unlink_export);
963
964 /* Import management functions */
965 static void class_import_destroy(struct obd_import *imp)
966 {
967         ENTRY;
968
969         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
970                 imp->imp_obd->obd_name);
971
972         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
973
974         ptlrpc_put_connection_superhack(imp->imp_connection);
975
976         while (!list_empty(&imp->imp_conn_list)) {
977                 struct obd_import_conn *imp_conn;
978
979                 imp_conn = list_entry(imp->imp_conn_list.next,
980                                       struct obd_import_conn, oic_item);
981                 list_del_init(&imp_conn->oic_item);
982                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
983                 OBD_FREE(imp_conn, sizeof(*imp_conn));
984         }
985
986         LASSERT(imp->imp_sec == NULL);
987         class_decref(imp->imp_obd, "import", imp);
988         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
989         EXIT;
990 }
991
992 static void import_handle_addref(void *import)
993 {
994         class_import_get(import);
995 }
996
997 static struct portals_handle_ops import_handle_ops = {
998         .hop_addref = import_handle_addref,
999         .hop_free   = NULL,
1000 };
1001
1002 struct obd_import *class_import_get(struct obd_import *import)
1003 {
1004         atomic_inc(&import->imp_refcount);
1005         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1006                atomic_read(&import->imp_refcount),
1007                import->imp_obd->obd_name);
1008         return import;
1009 }
1010 EXPORT_SYMBOL(class_import_get);
1011
1012 void class_import_put(struct obd_import *imp)
1013 {
1014         ENTRY;
1015
1016         LASSERT(list_empty(&imp->imp_zombie_chain));
1017         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1018
1019         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1020                atomic_read(&imp->imp_refcount) - 1,
1021                imp->imp_obd->obd_name);
1022
1023         if (atomic_dec_and_test(&imp->imp_refcount)) {
1024                 CDEBUG(D_INFO, "final put import %p\n", imp);
1025                 obd_zombie_import_add(imp);
1026         }
1027
1028         /* catch possible import put race */
1029         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1030         EXIT;
1031 }
1032 EXPORT_SYMBOL(class_import_put);
1033
1034 static void init_imp_at(struct imp_at *at) {
1035         int i;
1036         at_init(&at->iat_net_latency, 0, 0);
1037         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1038                 /* max service estimates are tracked on the server side, so
1039                    don't use the AT history here, just use the last reported
1040                    val. (But keep hist for proc histogram, worst_ever) */
1041                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1042                         AT_FLG_NOHIST);
1043         }
1044 }
1045
1046 struct obd_import *class_new_import(struct obd_device *obd)
1047 {
1048         struct obd_import *imp;
1049         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1050
1051         OBD_ALLOC(imp, sizeof(*imp));
1052         if (imp == NULL)
1053                 return NULL;
1054
1055         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1056         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1057         INIT_LIST_HEAD(&imp->imp_replay_list);
1058         INIT_LIST_HEAD(&imp->imp_sending_list);
1059         INIT_LIST_HEAD(&imp->imp_delayed_list);
1060         INIT_LIST_HEAD(&imp->imp_committed_list);
1061         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1062         imp->imp_known_replied_xid = 0;
1063         imp->imp_replay_cursor = &imp->imp_committed_list;
1064         spin_lock_init(&imp->imp_lock);
1065         imp->imp_last_success_conn = 0;
1066         imp->imp_state = LUSTRE_IMP_NEW;
1067         imp->imp_obd = class_incref(obd, "import", imp);
1068         mutex_init(&imp->imp_sec_mutex);
1069         init_waitqueue_head(&imp->imp_recovery_waitq);
1070
1071         if (curr_pid_ns->child_reaper)
1072                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1073         else
1074                 imp->imp_sec_refpid = 1;
1075
1076         atomic_set(&imp->imp_refcount, 2);
1077         atomic_set(&imp->imp_unregistering, 0);
1078         atomic_set(&imp->imp_inflight, 0);
1079         atomic_set(&imp->imp_replay_inflight, 0);
1080         atomic_set(&imp->imp_inval_count, 0);
1081         INIT_LIST_HEAD(&imp->imp_conn_list);
1082         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1083         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1084         init_imp_at(&imp->imp_at);
1085
1086         /* the default magic is V2, will be used in connect RPC, and
1087          * then adjusted according to the flags in request/reply. */
1088         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1089
1090         return imp;
1091 }
1092 EXPORT_SYMBOL(class_new_import);
1093
1094 void class_destroy_import(struct obd_import *import)
1095 {
1096         LASSERT(import != NULL);
1097         LASSERT(import != LP_POISON);
1098
1099         class_handle_unhash(&import->imp_handle);
1100
1101         spin_lock(&import->imp_lock);
1102         import->imp_generation++;
1103         spin_unlock(&import->imp_lock);
1104         class_import_put(import);
1105 }
1106 EXPORT_SYMBOL(class_destroy_import);
1107
1108 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1109
1110 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1111 {
1112         spin_lock(&exp->exp_locks_list_guard);
1113
1114         LASSERT(lock->l_exp_refs_nr >= 0);
1115
1116         if (lock->l_exp_refs_target != NULL &&
1117             lock->l_exp_refs_target != exp) {
1118                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1119                               exp, lock, lock->l_exp_refs_target);
1120         }
1121         if ((lock->l_exp_refs_nr ++) == 0) {
1122                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1123                 lock->l_exp_refs_target = exp;
1124         }
1125         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1126                lock, exp, lock->l_exp_refs_nr);
1127         spin_unlock(&exp->exp_locks_list_guard);
1128 }
1129 EXPORT_SYMBOL(__class_export_add_lock_ref);
1130
1131 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1132 {
1133         spin_lock(&exp->exp_locks_list_guard);
1134         LASSERT(lock->l_exp_refs_nr > 0);
1135         if (lock->l_exp_refs_target != exp) {
1136                 LCONSOLE_WARN("lock %p, "
1137                               "mismatching export pointers: %p, %p\n",
1138                               lock, lock->l_exp_refs_target, exp);
1139         }
1140         if (-- lock->l_exp_refs_nr == 0) {
1141                 list_del_init(&lock->l_exp_refs_link);
1142                 lock->l_exp_refs_target = NULL;
1143         }
1144         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1145                lock, exp, lock->l_exp_refs_nr);
1146         spin_unlock(&exp->exp_locks_list_guard);
1147 }
1148 EXPORT_SYMBOL(__class_export_del_lock_ref);
1149 #endif
1150
1151 /* A connection defines an export context in which preallocation can
1152    be managed. This releases the export pointer reference, and returns
1153    the export handle, so the export refcount is 1 when this function
1154    returns. */
1155 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1156                   struct obd_uuid *cluuid)
1157 {
1158         struct obd_export *export;
1159         LASSERT(conn != NULL);
1160         LASSERT(obd != NULL);
1161         LASSERT(cluuid != NULL);
1162         ENTRY;
1163
1164         export = class_new_export(obd, cluuid);
1165         if (IS_ERR(export))
1166                 RETURN(PTR_ERR(export));
1167
1168         conn->cookie = export->exp_handle.h_cookie;
1169         class_export_put(export);
1170
1171         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1172                cluuid->uuid, conn->cookie);
1173         RETURN(0);
1174 }
1175 EXPORT_SYMBOL(class_connect);
1176
1177 /* if export is involved in recovery then clean up related things */
1178 static void class_export_recovery_cleanup(struct obd_export *exp)
1179 {
1180         struct obd_device *obd = exp->exp_obd;
1181
1182         spin_lock(&obd->obd_recovery_task_lock);
1183         if (obd->obd_recovering) {
1184                 if (exp->exp_in_recovery) {
1185                         spin_lock(&exp->exp_lock);
1186                         exp->exp_in_recovery = 0;
1187                         spin_unlock(&exp->exp_lock);
1188                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1189                         atomic_dec(&obd->obd_connected_clients);
1190                 }
1191
1192                 /* if called during recovery then should update
1193                  * obd_stale_clients counter,
1194                  * lightweight exports are not counted */
1195                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1196                         exp->exp_obd->obd_stale_clients++;
1197         }
1198         spin_unlock(&obd->obd_recovery_task_lock);
1199
1200         spin_lock(&exp->exp_lock);
1201         /** Cleanup req replay fields */
1202         if (exp->exp_req_replay_needed) {
1203                 exp->exp_req_replay_needed = 0;
1204
1205                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1206                 atomic_dec(&obd->obd_req_replay_clients);
1207         }
1208
1209         /** Cleanup lock replay data */
1210         if (exp->exp_lock_replay_needed) {
1211                 exp->exp_lock_replay_needed = 0;
1212
1213                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1214                 atomic_dec(&obd->obd_lock_replay_clients);
1215         }
1216         spin_unlock(&exp->exp_lock);
1217 }
1218
1219 /* This function removes 1-3 references from the export:
1220  * 1 - for export pointer passed
1221  * and if disconnect really need
1222  * 2 - removing from hash
1223  * 3 - in client_unlink_export
1224  * The export pointer passed to this function can destroyed */
1225 int class_disconnect(struct obd_export *export)
1226 {
1227         int already_disconnected;
1228         ENTRY;
1229
1230         if (export == NULL) {
1231                 CWARN("attempting to free NULL export %p\n", export);
1232                 RETURN(-EINVAL);
1233         }
1234
1235         spin_lock(&export->exp_lock);
1236         already_disconnected = export->exp_disconnected;
1237         export->exp_disconnected = 1;
1238         /*  We hold references of export for uuid hash
1239          *  and nid_hash and export link at least. So
1240          *  it is safe to call cfs_hash_del in there.  */
1241         if (!hlist_unhashed(&export->exp_nid_hash))
1242                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1243                              &export->exp_connection->c_peer.nid,
1244                              &export->exp_nid_hash);
1245         spin_unlock(&export->exp_lock);
1246
1247         /* class_cleanup(), abort_recovery(), and class_fail_export()
1248          * all end up in here, and if any of them race we shouldn't
1249          * call extra class_export_puts(). */
1250         if (already_disconnected) {
1251                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1252                 GOTO(no_disconn, already_disconnected);
1253         }
1254
1255         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1256                export->exp_handle.h_cookie);
1257
1258         class_export_recovery_cleanup(export);
1259         class_unlink_export(export);
1260 no_disconn:
1261         class_export_put(export);
1262         RETURN(0);
1263 }
1264 EXPORT_SYMBOL(class_disconnect);
1265
1266 /* Return non-zero for a fully connected export */
1267 int class_connected_export(struct obd_export *exp)
1268 {
1269         int connected = 0;
1270
1271         if (exp) {
1272                 spin_lock(&exp->exp_lock);
1273                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1274                 spin_unlock(&exp->exp_lock);
1275         }
1276         return connected;
1277 }
1278 EXPORT_SYMBOL(class_connected_export);
1279
1280 static void class_disconnect_export_list(struct list_head *list,
1281                                          enum obd_option flags)
1282 {
1283         int rc;
1284         struct obd_export *exp;
1285         ENTRY;
1286
1287         /* It's possible that an export may disconnect itself, but
1288          * nothing else will be added to this list. */
1289         while (!list_empty(list)) {
1290                 exp = list_entry(list->next, struct obd_export,
1291                                  exp_obd_chain);
1292                 /* need for safe call CDEBUG after obd_disconnect */
1293                 class_export_get(exp);
1294
1295                 spin_lock(&exp->exp_lock);
1296                 exp->exp_flags = flags;
1297                 spin_unlock(&exp->exp_lock);
1298
1299                 if (obd_uuid_equals(&exp->exp_client_uuid,
1300                                     &exp->exp_obd->obd_uuid)) {
1301                         CDEBUG(D_HA,
1302                                "exp %p export uuid == obd uuid, don't discon\n",
1303                                exp);
1304                         /* Need to delete this now so we don't end up pointing
1305                          * to work_list later when this export is cleaned up. */
1306                         list_del_init(&exp->exp_obd_chain);
1307                         class_export_put(exp);
1308                         continue;
1309                 }
1310
1311                 class_export_get(exp);
1312                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1313                        "last request at "CFS_TIME_T"\n",
1314                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1315                        exp, exp->exp_last_request_time);
1316                 /* release one export reference anyway */
1317                 rc = obd_disconnect(exp);
1318
1319                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1320                        obd_export_nid2str(exp), exp, rc);
1321                 class_export_put(exp);
1322         }
1323         EXIT;
1324 }
1325
1326 void class_disconnect_exports(struct obd_device *obd)
1327 {
1328         struct list_head work_list;
1329         ENTRY;
1330
1331         /* Move all of the exports from obd_exports to a work list, en masse. */
1332         INIT_LIST_HEAD(&work_list);
1333         spin_lock(&obd->obd_dev_lock);
1334         list_splice_init(&obd->obd_exports, &work_list);
1335         list_splice_init(&obd->obd_delayed_exports, &work_list);
1336         spin_unlock(&obd->obd_dev_lock);
1337
1338         if (!list_empty(&work_list)) {
1339                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1340                        "disconnecting them\n", obd->obd_minor, obd);
1341                 class_disconnect_export_list(&work_list,
1342                                              exp_flags_from_obd(obd));
1343         } else
1344                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1345                        obd->obd_minor, obd);
1346         EXIT;
1347 }
1348 EXPORT_SYMBOL(class_disconnect_exports);
1349
1350 /* Remove exports that have not completed recovery.
1351  */
1352 void class_disconnect_stale_exports(struct obd_device *obd,
1353                                     int (*test_export)(struct obd_export *))
1354 {
1355         struct list_head work_list;
1356         struct obd_export *exp, *n;
1357         int evicted = 0;
1358         ENTRY;
1359
1360         INIT_LIST_HEAD(&work_list);
1361         spin_lock(&obd->obd_dev_lock);
1362         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1363                                  exp_obd_chain) {
1364                 /* don't count self-export as client */
1365                 if (obd_uuid_equals(&exp->exp_client_uuid,
1366                                     &exp->exp_obd->obd_uuid))
1367                         continue;
1368
1369                 /* don't evict clients which have no slot in last_rcvd
1370                  * (e.g. lightweight connection) */
1371                 if (exp->exp_target_data.ted_lr_idx == -1)
1372                         continue;
1373
1374                 spin_lock(&exp->exp_lock);
1375                 if (exp->exp_failed || test_export(exp)) {
1376                         spin_unlock(&exp->exp_lock);
1377                         continue;
1378                 }
1379                 exp->exp_failed = 1;
1380                 spin_unlock(&exp->exp_lock);
1381
1382                 list_move(&exp->exp_obd_chain, &work_list);
1383                 evicted++;
1384                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1385                        obd->obd_name, exp->exp_client_uuid.uuid,
1386                        exp->exp_connection == NULL ? "<unknown>" :
1387                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1388                 print_export_data(exp, "EVICTING", 0, D_HA);
1389         }
1390         spin_unlock(&obd->obd_dev_lock);
1391
1392         if (evicted)
1393                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1394                               obd->obd_name, evicted);
1395
1396         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1397                                                  OBD_OPT_ABORT_RECOV);
1398         EXIT;
1399 }
1400 EXPORT_SYMBOL(class_disconnect_stale_exports);
1401
1402 void class_fail_export(struct obd_export *exp)
1403 {
1404         int rc, already_failed;
1405
1406         spin_lock(&exp->exp_lock);
1407         already_failed = exp->exp_failed;
1408         exp->exp_failed = 1;
1409         spin_unlock(&exp->exp_lock);
1410
1411         if (already_failed) {
1412                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1413                        exp, exp->exp_client_uuid.uuid);
1414                 return;
1415         }
1416
1417         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1418                exp, exp->exp_client_uuid.uuid);
1419
1420         if (obd_dump_on_timeout)
1421                 libcfs_debug_dumplog();
1422
1423         /* need for safe call CDEBUG after obd_disconnect */
1424         class_export_get(exp);
1425
1426         /* Most callers into obd_disconnect are removing their own reference
1427          * (request, for example) in addition to the one from the hash table.
1428          * We don't have such a reference here, so make one. */
1429         class_export_get(exp);
1430         rc = obd_disconnect(exp);
1431         if (rc)
1432                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1433         else
1434                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1435                        exp, exp->exp_client_uuid.uuid);
1436         class_export_put(exp);
1437 }
1438 EXPORT_SYMBOL(class_fail_export);
1439
1440 char *obd_export_nid2str(struct obd_export *exp)
1441 {
1442         if (exp->exp_connection != NULL)
1443                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1444
1445         return "(no nid)";
1446 }
1447 EXPORT_SYMBOL(obd_export_nid2str);
1448
1449 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1450 {
1451         struct cfs_hash *nid_hash;
1452         struct obd_export *doomed_exp = NULL;
1453         int exports_evicted = 0;
1454
1455         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1456
1457         spin_lock(&obd->obd_dev_lock);
1458         /* umount has run already, so evict thread should leave
1459          * its task to umount thread now */
1460         if (obd->obd_stopping) {
1461                 spin_unlock(&obd->obd_dev_lock);
1462                 return exports_evicted;
1463         }
1464         nid_hash = obd->obd_nid_hash;
1465         cfs_hash_getref(nid_hash);
1466         spin_unlock(&obd->obd_dev_lock);
1467
1468         do {
1469                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1470                 if (doomed_exp == NULL)
1471                         break;
1472
1473                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1474                          "nid %s found, wanted nid %s, requested nid %s\n",
1475                          obd_export_nid2str(doomed_exp),
1476                          libcfs_nid2str(nid_key), nid);
1477                 LASSERTF(doomed_exp != obd->obd_self_export,
1478                          "self-export is hashed by NID?\n");
1479                 exports_evicted++;
1480                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1481                               "request\n", obd->obd_name,
1482                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1483                               obd_export_nid2str(doomed_exp));
1484                 class_fail_export(doomed_exp);
1485                 class_export_put(doomed_exp);
1486         } while (1);
1487
1488         cfs_hash_putref(nid_hash);
1489
1490         if (!exports_evicted)
1491                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1492                        obd->obd_name, nid);
1493         return exports_evicted;
1494 }
1495 EXPORT_SYMBOL(obd_export_evict_by_nid);
1496
1497 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1498 {
1499         struct cfs_hash *uuid_hash;
1500         struct obd_export *doomed_exp = NULL;
1501         struct obd_uuid doomed_uuid;
1502         int exports_evicted = 0;
1503
1504         spin_lock(&obd->obd_dev_lock);
1505         if (obd->obd_stopping) {
1506                 spin_unlock(&obd->obd_dev_lock);
1507                 return exports_evicted;
1508         }
1509         uuid_hash = obd->obd_uuid_hash;
1510         cfs_hash_getref(uuid_hash);
1511         spin_unlock(&obd->obd_dev_lock);
1512
1513         obd_str2uuid(&doomed_uuid, uuid);
1514         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1515                 CERROR("%s: can't evict myself\n", obd->obd_name);
1516                 cfs_hash_putref(uuid_hash);
1517                 return exports_evicted;
1518         }
1519
1520         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1521
1522         if (doomed_exp == NULL) {
1523                 CERROR("%s: can't disconnect %s: no exports found\n",
1524                        obd->obd_name, uuid);
1525         } else {
1526                 CWARN("%s: evicting %s at adminstrative request\n",
1527                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1528                 class_fail_export(doomed_exp);
1529                 class_export_put(doomed_exp);
1530                 exports_evicted++;
1531         }
1532         cfs_hash_putref(uuid_hash);
1533
1534         return exports_evicted;
1535 }
1536
1537 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1538 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1539 EXPORT_SYMBOL(class_export_dump_hook);
1540 #endif
1541
1542 static void print_export_data(struct obd_export *exp, const char *status,
1543                               int locks, int debug_level)
1544 {
1545         struct ptlrpc_reply_state *rs;
1546         struct ptlrpc_reply_state *first_reply = NULL;
1547         int nreplies = 0;
1548
1549         spin_lock(&exp->exp_lock);
1550         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1551                             rs_exp_list) {
1552                 if (nreplies == 0)
1553                         first_reply = rs;
1554                 nreplies++;
1555         }
1556         spin_unlock(&exp->exp_lock);
1557
1558         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1559                "%p %s %llu stale:%d\n",
1560                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1561                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1562                atomic_read(&exp->exp_rpc_count),
1563                atomic_read(&exp->exp_cb_count),
1564                atomic_read(&exp->exp_locks_count),
1565                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1566                nreplies, first_reply, nreplies > 3 ? "..." : "",
1567                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1568 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1569         if (locks && class_export_dump_hook != NULL)
1570                 class_export_dump_hook(exp);
1571 #endif
1572 }
1573
1574 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1575 {
1576         struct obd_export *exp;
1577
1578         spin_lock(&obd->obd_dev_lock);
1579         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1580                 print_export_data(exp, "ACTIVE", locks, debug_level);
1581         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1582                 print_export_data(exp, "UNLINKED", locks, debug_level);
1583         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1584                 print_export_data(exp, "DELAYED", locks, debug_level);
1585         spin_unlock(&obd->obd_dev_lock);
1586         spin_lock(&obd_zombie_impexp_lock);
1587         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1588                 print_export_data(exp, "ZOMBIE", locks, debug_level);
1589         spin_unlock(&obd_zombie_impexp_lock);
1590 }
1591
1592 void obd_exports_barrier(struct obd_device *obd)
1593 {
1594         int waited = 2;
1595         LASSERT(list_empty(&obd->obd_exports));
1596         spin_lock(&obd->obd_dev_lock);
1597         while (!list_empty(&obd->obd_unlinked_exports)) {
1598                 spin_unlock(&obd->obd_dev_lock);
1599                 set_current_state(TASK_UNINTERRUPTIBLE);
1600                 schedule_timeout(cfs_time_seconds(waited));
1601                 if (waited > 5 && is_power_of_2(waited)) {
1602                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1603                                       "more than %d seconds. "
1604                                       "The obd refcount = %d. Is it stuck?\n",
1605                                       obd->obd_name, waited,
1606                                       atomic_read(&obd->obd_refcount));
1607                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1608                 }
1609                 waited *= 2;
1610                 spin_lock(&obd->obd_dev_lock);
1611         }
1612         spin_unlock(&obd->obd_dev_lock);
1613 }
1614 EXPORT_SYMBOL(obd_exports_barrier);
1615
1616 /* Total amount of zombies to be destroyed */
1617 static int zombies_count = 0;
1618
1619 /**
1620  * kill zombie imports and exports
1621  */
1622 void obd_zombie_impexp_cull(void)
1623 {
1624         struct obd_import *import;
1625         struct obd_export *export;
1626         ENTRY;
1627
1628         do {
1629                 spin_lock(&obd_zombie_impexp_lock);
1630
1631                 import = NULL;
1632                 if (!list_empty(&obd_zombie_imports)) {
1633                         import = list_entry(obd_zombie_imports.next,
1634                                             struct obd_import,
1635                                             imp_zombie_chain);
1636                         list_del_init(&import->imp_zombie_chain);
1637                 }
1638
1639                 export = NULL;
1640                 if (!list_empty(&obd_zombie_exports)) {
1641                         export = list_entry(obd_zombie_exports.next,
1642                                             struct obd_export,
1643                                             exp_obd_chain);
1644                         list_del_init(&export->exp_obd_chain);
1645                 }
1646
1647                 spin_unlock(&obd_zombie_impexp_lock);
1648
1649                 if (import != NULL) {
1650                         class_import_destroy(import);
1651                         spin_lock(&obd_zombie_impexp_lock);
1652                         zombies_count--;
1653                         spin_unlock(&obd_zombie_impexp_lock);
1654                 }
1655
1656                 if (export != NULL) {
1657                         class_export_destroy(export);
1658                         spin_lock(&obd_zombie_impexp_lock);
1659                         zombies_count--;
1660                         spin_unlock(&obd_zombie_impexp_lock);
1661                 }
1662
1663                 cond_resched();
1664         } while (import != NULL || export != NULL);
1665         EXIT;
1666 }
1667
1668 static struct completion        obd_zombie_start;
1669 static struct completion        obd_zombie_stop;
1670 static unsigned long            obd_zombie_flags;
1671 static wait_queue_head_t        obd_zombie_waitq;
1672 static pid_t                    obd_zombie_pid;
1673
1674 enum {
1675         OBD_ZOMBIE_STOP         = 0x0001,
1676 };
1677
1678 /**
1679  * check for work for kill zombie import/export thread.
1680  */
1681 static int obd_zombie_impexp_check(void *arg)
1682 {
1683         int rc;
1684
1685         spin_lock(&obd_zombie_impexp_lock);
1686         rc = (zombies_count == 0) &&
1687              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1688         spin_unlock(&obd_zombie_impexp_lock);
1689
1690         RETURN(rc);
1691 }
1692
1693 /**
1694  * Add export to the obd_zombe thread and notify it.
1695  */
1696 static void obd_zombie_export_add(struct obd_export *exp) {
1697         atomic_dec(&obd_stale_export_num);
1698         spin_lock(&exp->exp_obd->obd_dev_lock);
1699         LASSERT(!list_empty(&exp->exp_obd_chain));
1700         list_del_init(&exp->exp_obd_chain);
1701         spin_unlock(&exp->exp_obd->obd_dev_lock);
1702         spin_lock(&obd_zombie_impexp_lock);
1703         zombies_count++;
1704         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1705         spin_unlock(&obd_zombie_impexp_lock);
1706
1707         obd_zombie_impexp_notify();
1708 }
1709
1710 /**
1711  * Add import to the obd_zombe thread and notify it.
1712  */
1713 static void obd_zombie_import_add(struct obd_import *imp) {
1714         LASSERT(imp->imp_sec == NULL);
1715         spin_lock(&obd_zombie_impexp_lock);
1716         LASSERT(list_empty(&imp->imp_zombie_chain));
1717         zombies_count++;
1718         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1719         spin_unlock(&obd_zombie_impexp_lock);
1720
1721         obd_zombie_impexp_notify();
1722 }
1723
1724 /**
1725  * notify import/export destroy thread about new zombie.
1726  */
1727 static void obd_zombie_impexp_notify(void)
1728 {
1729         /*
1730          * Make sure obd_zomebie_impexp_thread get this notification.
1731          * It is possible this signal only get by obd_zombie_barrier, and
1732          * barrier gulps this notification and sleeps away and hangs ensues
1733          */
1734         wake_up_all(&obd_zombie_waitq);
1735 }
1736
1737 /**
1738  * check whether obd_zombie is idle
1739  */
1740 static int obd_zombie_is_idle(void)
1741 {
1742         int rc;
1743
1744         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1745         spin_lock(&obd_zombie_impexp_lock);
1746         rc = (zombies_count == 0);
1747         spin_unlock(&obd_zombie_impexp_lock);
1748         return rc;
1749 }
1750
1751 /**
1752  * wait when obd_zombie import/export queues become empty
1753  */
1754 void obd_zombie_barrier(void)
1755 {
1756         struct l_wait_info lwi = { 0 };
1757
1758         if (obd_zombie_pid == current_pid())
1759                 /* don't wait for myself */
1760                 return;
1761         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1762 }
1763 EXPORT_SYMBOL(obd_zombie_barrier);
1764
1765
1766 struct obd_export *obd_stale_export_get(void)
1767 {
1768         struct obd_export *exp = NULL;
1769         ENTRY;
1770
1771         spin_lock(&obd_stale_export_lock);
1772         if (!list_empty(&obd_stale_exports)) {
1773                 exp = list_entry(obd_stale_exports.next,
1774                                  struct obd_export, exp_stale_list);
1775                 list_del_init(&exp->exp_stale_list);
1776         }
1777         spin_unlock(&obd_stale_export_lock);
1778
1779         if (exp) {
1780                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1781                        atomic_read(&obd_stale_export_num));
1782         }
1783         RETURN(exp);
1784 }
1785 EXPORT_SYMBOL(obd_stale_export_get);
1786
1787 void obd_stale_export_put(struct obd_export *exp)
1788 {
1789         ENTRY;
1790
1791         LASSERT(list_empty(&exp->exp_stale_list));
1792         if (exp->exp_lock_hash &&
1793             atomic_read(&exp->exp_lock_hash->hs_count)) {
1794                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1795                        atomic_read(&obd_stale_export_num));
1796
1797                 spin_lock_bh(&exp->exp_bl_list_lock);
1798                 spin_lock(&obd_stale_export_lock);
1799                 /* Add to the tail if there is no blocked locks,
1800                  * to the head otherwise. */
1801                 if (list_empty(&exp->exp_bl_list))
1802                         list_add_tail(&exp->exp_stale_list,
1803                                       &obd_stale_exports);
1804                 else
1805                         list_add(&exp->exp_stale_list,
1806                                  &obd_stale_exports);
1807
1808                 spin_unlock(&obd_stale_export_lock);
1809                 spin_unlock_bh(&exp->exp_bl_list_lock);
1810         } else {
1811                 class_export_put(exp);
1812         }
1813         EXIT;
1814 }
1815 EXPORT_SYMBOL(obd_stale_export_put);
1816
1817 /**
1818  * Adjust the position of the export in the stale list,
1819  * i.e. move to the head of the list if is needed.
1820  **/
1821 void obd_stale_export_adjust(struct obd_export *exp)
1822 {
1823         LASSERT(exp != NULL);
1824         spin_lock_bh(&exp->exp_bl_list_lock);
1825         spin_lock(&obd_stale_export_lock);
1826
1827         if (!list_empty(&exp->exp_stale_list) &&
1828             !list_empty(&exp->exp_bl_list))
1829                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1830
1831         spin_unlock(&obd_stale_export_lock);
1832         spin_unlock_bh(&exp->exp_bl_list_lock);
1833 }
1834 EXPORT_SYMBOL(obd_stale_export_adjust);
1835
1836 /**
1837  * destroy zombie export/import thread.
1838  */
1839 static int obd_zombie_impexp_thread(void *unused)
1840 {
1841         unshare_fs_struct();
1842         complete(&obd_zombie_start);
1843
1844         obd_zombie_pid = current_pid();
1845
1846         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1847                 struct l_wait_info lwi = { 0 };
1848
1849                 l_wait_event(obd_zombie_waitq,
1850                              !obd_zombie_impexp_check(NULL), &lwi);
1851                 obd_zombie_impexp_cull();
1852
1853                 /*
1854                  * Notify obd_zombie_barrier callers that queues
1855                  * may be empty.
1856                  */
1857                 wake_up(&obd_zombie_waitq);
1858         }
1859
1860         complete(&obd_zombie_stop);
1861
1862         RETURN(0);
1863 }
1864
1865
1866 /**
1867  * start destroy zombie import/export thread
1868  */
1869 int obd_zombie_impexp_init(void)
1870 {
1871         struct task_struct *task;
1872
1873         INIT_LIST_HEAD(&obd_zombie_imports);
1874
1875         INIT_LIST_HEAD(&obd_zombie_exports);
1876         spin_lock_init(&obd_zombie_impexp_lock);
1877         init_completion(&obd_zombie_start);
1878         init_completion(&obd_zombie_stop);
1879         init_waitqueue_head(&obd_zombie_waitq);
1880         obd_zombie_pid = 0;
1881
1882         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1883         if (IS_ERR(task))
1884                 RETURN(PTR_ERR(task));
1885
1886         wait_for_completion(&obd_zombie_start);
1887         RETURN(0);
1888 }
1889 /**
1890  * stop destroy zombie import/export thread
1891  */
1892 void obd_zombie_impexp_stop(void)
1893 {
1894         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1895         obd_zombie_impexp_notify();
1896         wait_for_completion(&obd_zombie_stop);
1897 }
1898
1899 /***** Kernel-userspace comm helpers *******/
1900
1901 /* Get length of entire message, including header */
1902 int kuc_len(int payload_len)
1903 {
1904         return sizeof(struct kuc_hdr) + payload_len;
1905 }
1906 EXPORT_SYMBOL(kuc_len);
1907
1908 /* Get a pointer to kuc header, given a ptr to the payload
1909  * @param p Pointer to payload area
1910  * @returns Pointer to kuc header
1911  */
1912 struct kuc_hdr * kuc_ptr(void *p)
1913 {
1914         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1915         LASSERT(lh->kuc_magic == KUC_MAGIC);
1916         return lh;
1917 }
1918 EXPORT_SYMBOL(kuc_ptr);
1919
1920 /* Alloc space for a message, and fill in header
1921  * @return Pointer to payload area
1922  */
1923 void *kuc_alloc(int payload_len, int transport, int type)
1924 {
1925         struct kuc_hdr *lh;
1926         int len = kuc_len(payload_len);
1927
1928         OBD_ALLOC(lh, len);
1929         if (lh == NULL)
1930                 return ERR_PTR(-ENOMEM);
1931
1932         lh->kuc_magic = KUC_MAGIC;
1933         lh->kuc_transport = transport;
1934         lh->kuc_msgtype = type;
1935         lh->kuc_msglen = len;
1936
1937         return (void *)(lh + 1);
1938 }
1939 EXPORT_SYMBOL(kuc_alloc);
1940
1941 /* Takes pointer to payload area */
1942 void kuc_free(void *p, int payload_len)
1943 {
1944         struct kuc_hdr *lh = kuc_ptr(p);
1945         OBD_FREE(lh, kuc_len(payload_len));
1946 }
1947 EXPORT_SYMBOL(kuc_free);
1948
1949 struct obd_request_slot_waiter {
1950         struct list_head        orsw_entry;
1951         wait_queue_head_t       orsw_waitq;
1952         bool                    orsw_signaled;
1953 };
1954
1955 static bool obd_request_slot_avail(struct client_obd *cli,
1956                                    struct obd_request_slot_waiter *orsw)
1957 {
1958         bool avail;
1959
1960         spin_lock(&cli->cl_loi_list_lock);
1961         avail = !!list_empty(&orsw->orsw_entry);
1962         spin_unlock(&cli->cl_loi_list_lock);
1963
1964         return avail;
1965 };
1966
1967 /*
1968  * For network flow control, the RPC sponsor needs to acquire a credit
1969  * before sending the RPC. The credits count for a connection is defined
1970  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1971  * the subsequent RPC sponsors need to wait until others released their
1972  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1973  */
1974 int obd_get_request_slot(struct client_obd *cli)
1975 {
1976         struct obd_request_slot_waiter   orsw;
1977         struct l_wait_info               lwi;
1978         int                              rc;
1979
1980         spin_lock(&cli->cl_loi_list_lock);
1981         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1982                 cli->cl_r_in_flight++;
1983                 spin_unlock(&cli->cl_loi_list_lock);
1984                 return 0;
1985         }
1986
1987         init_waitqueue_head(&orsw.orsw_waitq);
1988         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1989         orsw.orsw_signaled = false;
1990         spin_unlock(&cli->cl_loi_list_lock);
1991
1992         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1993         rc = l_wait_event(orsw.orsw_waitq,
1994                           obd_request_slot_avail(cli, &orsw) ||
1995                           orsw.orsw_signaled,
1996                           &lwi);
1997
1998         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1999          * freed but other (such as obd_put_request_slot) is using it. */
2000         spin_lock(&cli->cl_loi_list_lock);
2001         if (rc != 0) {
2002                 if (!orsw.orsw_signaled) {
2003                         if (list_empty(&orsw.orsw_entry))
2004                                 cli->cl_r_in_flight--;
2005                         else
2006                                 list_del(&orsw.orsw_entry);
2007                 }
2008         }
2009
2010         if (orsw.orsw_signaled) {
2011                 LASSERT(list_empty(&orsw.orsw_entry));
2012
2013                 rc = -EINTR;
2014         }
2015         spin_unlock(&cli->cl_loi_list_lock);
2016
2017         return rc;
2018 }
2019 EXPORT_SYMBOL(obd_get_request_slot);
2020
2021 void obd_put_request_slot(struct client_obd *cli)
2022 {
2023         struct obd_request_slot_waiter *orsw;
2024
2025         spin_lock(&cli->cl_loi_list_lock);
2026         cli->cl_r_in_flight--;
2027
2028         /* If there is free slot, wakeup the first waiter. */
2029         if (!list_empty(&cli->cl_loi_read_list) &&
2030             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2031                 orsw = list_entry(cli->cl_loi_read_list.next,
2032                                   struct obd_request_slot_waiter, orsw_entry);
2033                 list_del_init(&orsw->orsw_entry);
2034                 cli->cl_r_in_flight++;
2035                 wake_up(&orsw->orsw_waitq);
2036         }
2037         spin_unlock(&cli->cl_loi_list_lock);
2038 }
2039 EXPORT_SYMBOL(obd_put_request_slot);
2040
2041 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2042 {
2043         return cli->cl_max_rpcs_in_flight;
2044 }
2045 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2046
2047 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2048 {
2049         struct obd_request_slot_waiter *orsw;
2050         __u32                           old;
2051         int                             diff;
2052         int                             i;
2053         char                            *typ_name;
2054         int                             rc;
2055
2056         if (max > OBD_MAX_RIF_MAX || max < 1)
2057                 return -ERANGE;
2058
2059         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2060         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2061                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2062                  * strictly lower that max_rpcs_in_flight */
2063                 if (max < 2) {
2064                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2065                                "because it must be higher than "
2066                                "max_mod_rpcs_in_flight value",
2067                                cli->cl_import->imp_obd->obd_name);
2068                         return -ERANGE;
2069                 }
2070                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2071                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2072                         if (rc != 0)
2073                                 return rc;
2074                 }
2075         }
2076
2077         spin_lock(&cli->cl_loi_list_lock);
2078         old = cli->cl_max_rpcs_in_flight;
2079         cli->cl_max_rpcs_in_flight = max;
2080         diff = max - old;
2081
2082         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2083         for (i = 0; i < diff; i++) {
2084                 if (list_empty(&cli->cl_loi_read_list))
2085                         break;
2086
2087                 orsw = list_entry(cli->cl_loi_read_list.next,
2088                                   struct obd_request_slot_waiter, orsw_entry);
2089                 list_del_init(&orsw->orsw_entry);
2090                 cli->cl_r_in_flight++;
2091                 wake_up(&orsw->orsw_waitq);
2092         }
2093         spin_unlock(&cli->cl_loi_list_lock);
2094
2095         return 0;
2096 }
2097 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2098
2099 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2100 {
2101         return cli->cl_max_mod_rpcs_in_flight;
2102 }
2103 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2104
2105 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2106 {
2107         struct obd_connect_data *ocd;
2108         __u16 maxmodrpcs;
2109         __u16 prev;
2110
2111         if (max > OBD_MAX_RIF_MAX || max < 1)
2112                 return -ERANGE;
2113
2114         /* cannot exceed or equal max_rpcs_in_flight */
2115         if (max >= cli->cl_max_rpcs_in_flight) {
2116                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2117                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2118                        cli->cl_import->imp_obd->obd_name,
2119                        max, cli->cl_max_rpcs_in_flight);
2120                 return -ERANGE;
2121         }
2122
2123         /* cannot exceed max modify RPCs in flight supported by the server */
2124         ocd = &cli->cl_import->imp_connect_data;
2125         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2126                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2127         else
2128                 maxmodrpcs = 1;
2129         if (max > maxmodrpcs) {
2130                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2131                        "higher than max_mod_rpcs_per_client value (%hu) "
2132                        "returned by the server at connection\n",
2133                        cli->cl_import->imp_obd->obd_name,
2134                        max, maxmodrpcs);
2135                 return -ERANGE;
2136         }
2137
2138         spin_lock(&cli->cl_mod_rpcs_lock);
2139
2140         prev = cli->cl_max_mod_rpcs_in_flight;
2141         cli->cl_max_mod_rpcs_in_flight = max;
2142
2143         /* wakeup waiters if limit has been increased */
2144         if (cli->cl_max_mod_rpcs_in_flight > prev)
2145                 wake_up(&cli->cl_mod_rpcs_waitq);
2146
2147         spin_unlock(&cli->cl_mod_rpcs_lock);
2148
2149         return 0;
2150 }
2151 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2152
2153
2154 #define pct(a, b) (b ? a * 100 / b : 0)
2155 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2156                                struct seq_file *seq)
2157 {
2158         struct timeval now;
2159         unsigned long mod_tot = 0, mod_cum;
2160         int i;
2161
2162         do_gettimeofday(&now);
2163
2164         spin_lock(&cli->cl_mod_rpcs_lock);
2165
2166         seq_printf(seq, "snapshot_time:         %lu.%lu (secs.usecs)\n",
2167                    now.tv_sec, now.tv_usec);
2168         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2169                    cli->cl_mod_rpcs_in_flight);
2170
2171         seq_printf(seq, "\n\t\t\tmodify\n");
2172         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2173
2174         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2175
2176         mod_cum = 0;
2177         for (i = 0; i < OBD_HIST_MAX; i++) {
2178                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2179                 mod_cum += mod;
2180                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2181                            i, mod, pct(mod, mod_tot),
2182                            pct(mod_cum, mod_tot));
2183                 if (mod_cum == mod_tot)
2184                         break;
2185         }
2186
2187         spin_unlock(&cli->cl_mod_rpcs_lock);
2188
2189         return 0;
2190 }
2191 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2192 #undef pct
2193
2194
2195 /* The number of modify RPCs sent in parallel is limited
2196  * because the server has a finite number of slots per client to
2197  * store request result and ensure reply reconstruction when needed.
2198  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2199  * that takes into account server limit and cl_max_rpcs_in_flight
2200  * value.
2201  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2202  * one close request is allowed above the maximum.
2203  */
2204 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2205                                                  bool close_req)
2206 {
2207         bool avail;
2208
2209         /* A slot is available if
2210          * - number of modify RPCs in flight is less than the max
2211          * - it's a close RPC and no other close request is in flight
2212          */
2213         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2214                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2215
2216         return avail;
2217 }
2218
2219 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2220                                          bool close_req)
2221 {
2222         bool avail;
2223
2224         spin_lock(&cli->cl_mod_rpcs_lock);
2225         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2226         spin_unlock(&cli->cl_mod_rpcs_lock);
2227         return avail;
2228 }
2229
2230 /* Get a modify RPC slot from the obd client @cli according
2231  * to the kind of operation @opc that is going to be sent
2232  * and the intent @it of the operation if it applies.
2233  * If the maximum number of modify RPCs in flight is reached
2234  * the thread is put to sleep.
2235  * Returns the tag to be set in the request message. Tag 0
2236  * is reserved for non-modifying requests.
2237  */
2238 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2239                            struct lookup_intent *it)
2240 {
2241         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2242         bool                    close_req = false;
2243         __u16                   i, max;
2244
2245         /* read-only metadata RPCs don't consume a slot on MDT
2246          * for reply reconstruction
2247          */
2248         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2249                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2250                 return 0;
2251
2252         if (opc == MDS_CLOSE)
2253                 close_req = true;
2254
2255         do {
2256                 spin_lock(&cli->cl_mod_rpcs_lock);
2257                 max = cli->cl_max_mod_rpcs_in_flight;
2258                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2259                         /* there is a slot available */
2260                         cli->cl_mod_rpcs_in_flight++;
2261                         if (close_req)
2262                                 cli->cl_close_rpcs_in_flight++;
2263                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2264                                          cli->cl_mod_rpcs_in_flight);
2265                         /* find a free tag */
2266                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2267                                                 max + 1);
2268                         LASSERT(i < OBD_MAX_RIF_MAX);
2269                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2270                         spin_unlock(&cli->cl_mod_rpcs_lock);
2271                         /* tag 0 is reserved for non-modify RPCs */
2272                         return i + 1;
2273                 }
2274                 spin_unlock(&cli->cl_mod_rpcs_lock);
2275
2276                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2277                        "opc %u, max %hu\n",
2278                        cli->cl_import->imp_obd->obd_name, opc, max);
2279
2280                 l_wait_event(cli->cl_mod_rpcs_waitq,
2281                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2282         } while (true);
2283 }
2284 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2285
2286 /* Put a modify RPC slot from the obd client @cli according
2287  * to the kind of operation @opc that has been sent and the
2288  * intent @it of the operation if it applies.
2289  */
2290 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2291                           struct lookup_intent *it, __u16 tag)
2292 {
2293         bool                    close_req = false;
2294
2295         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2296                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2297                 return;
2298
2299         if (opc == MDS_CLOSE)
2300                 close_req = true;
2301
2302         spin_lock(&cli->cl_mod_rpcs_lock);
2303         cli->cl_mod_rpcs_in_flight--;
2304         if (close_req)
2305                 cli->cl_close_rpcs_in_flight--;
2306         /* release the tag in the bitmap */
2307         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2308         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2309         spin_unlock(&cli->cl_mod_rpcs_lock);
2310         wake_up(&cli->cl_mod_rpcs_waitq);
2311 }
2312 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2313