Whamcloud - gitweb
54168e03ab93e358e78550967e92195be75742a3
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdclass/genops.c
33  *
34  * These are the only exported functions, they provide some generic
35  * infrastructure for managing object devices
36  */
37
38 #define DEBUG_SUBSYSTEM S_CLASS
39
40 #include <linux/pid_namespace.h>
41 #include <linux/kthread.h>
42 #include <obd_class.h>
43 #include <lprocfs_status.h>
44 #include <lustre_disk.h>
45 #include <lustre_kernelcomm.h>
46
47 spinlock_t obd_types_lock;
48
49 static struct kmem_cache *obd_device_cachep;
50 struct kmem_cache *obdo_cachep;
51 EXPORT_SYMBOL(obdo_cachep);
52 static struct kmem_cache *import_cachep;
53
54 static struct list_head obd_zombie_imports;
55 static struct list_head obd_zombie_exports;
56 static spinlock_t  obd_zombie_impexp_lock;
57
58 static void obd_zombie_impexp_notify(void);
59 static void obd_zombie_export_add(struct obd_export *exp);
60 static void obd_zombie_import_add(struct obd_import *imp);
61 static void print_export_data(struct obd_export *exp,
62                               const char *status, int locks, int debug_level);
63
64 struct list_head obd_stale_exports;
65 spinlock_t       obd_stale_export_lock;
66 atomic_t         obd_stale_export_num;
67
68 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
69 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
70
71 /*
72  * support functions: we could use inter-module communication, but this
73  * is more portable to other OS's
74  */
75 static struct obd_device *obd_device_alloc(void)
76 {
77         struct obd_device *obd;
78
79         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
80         if (obd != NULL) {
81                 obd->obd_magic = OBD_DEVICE_MAGIC;
82         }
83         return obd;
84 }
85
86 static void obd_device_free(struct obd_device *obd)
87 {
88         LASSERT(obd != NULL);
89         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
90                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
91         if (obd->obd_namespace != NULL) {
92                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
93                        obd, obd->obd_namespace, obd->obd_force);
94                 LBUG();
95         }
96         lu_ref_fini(&obd->obd_reference);
97         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
98 }
99
100 struct obd_type *class_search_type(const char *name)
101 {
102         struct list_head *tmp;
103         struct obd_type *type;
104
105         spin_lock(&obd_types_lock);
106         list_for_each(tmp, &obd_types) {
107                 type = list_entry(tmp, struct obd_type, typ_chain);
108                 if (strcmp(type->typ_name, name) == 0) {
109                         spin_unlock(&obd_types_lock);
110                         return type;
111                 }
112         }
113         spin_unlock(&obd_types_lock);
114         return NULL;
115 }
116 EXPORT_SYMBOL(class_search_type);
117
118 struct obd_type *class_get_type(const char *name)
119 {
120         struct obd_type *type = class_search_type(name);
121
122 #ifdef HAVE_MODULE_LOADING_SUPPORT
123         if (!type) {
124                 const char *modname = name;
125
126                 if (strcmp(modname, "obdfilter") == 0)
127                         modname = "ofd";
128
129                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
130                         modname = LUSTRE_OSP_NAME;
131
132                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
133                         modname = LUSTRE_MDT_NAME;
134
135                 if (!request_module("%s", modname)) {
136                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
137                         type = class_search_type(name);
138                 } else {
139                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
140                                            modname);
141                 }
142         }
143 #endif
144         if (type) {
145                 spin_lock(&type->obd_type_lock);
146                 type->typ_refcnt++;
147                 try_module_get(type->typ_dt_ops->o_owner);
148                 spin_unlock(&type->obd_type_lock);
149         }
150         return type;
151 }
152
153 void class_put_type(struct obd_type *type)
154 {
155         LASSERT(type);
156         spin_lock(&type->obd_type_lock);
157         type->typ_refcnt--;
158         module_put(type->typ_dt_ops->o_owner);
159         spin_unlock(&type->obd_type_lock);
160 }
161
162 #define CLASS_MAX_NAME 1024
163
164 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
165                         bool enable_proc, struct lprocfs_vars *vars,
166                         const char *name, struct lu_device_type *ldt)
167 {
168         struct obd_type *type;
169         int rc = 0;
170         ENTRY;
171
172         /* sanity check */
173         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
174
175         if (class_search_type(name)) {
176                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
177                 RETURN(-EEXIST);
178         }
179
180         rc = -ENOMEM;
181         OBD_ALLOC(type, sizeof(*type));
182         if (type == NULL)
183                 RETURN(rc);
184
185         OBD_ALLOC_PTR(type->typ_dt_ops);
186         OBD_ALLOC_PTR(type->typ_md_ops);
187         OBD_ALLOC(type->typ_name, strlen(name) + 1);
188
189         if (type->typ_dt_ops == NULL ||
190             type->typ_md_ops == NULL ||
191             type->typ_name == NULL)
192                 GOTO (failed, rc);
193
194         *(type->typ_dt_ops) = *dt_ops;
195         /* md_ops is optional */
196         if (md_ops)
197                 *(type->typ_md_ops) = *md_ops;
198         strcpy(type->typ_name, name);
199         spin_lock_init(&type->obd_type_lock);
200
201 #ifdef CONFIG_PROC_FS
202         if (enable_proc) {
203                 type->typ_procroot = lprocfs_register(type->typ_name,
204                                                       proc_lustre_root,
205                                                       vars, type);
206                 if (IS_ERR(type->typ_procroot)) {
207                         rc = PTR_ERR(type->typ_procroot);
208                         type->typ_procroot = NULL;
209                         GOTO(failed, rc);
210                 }
211         }
212 #endif
213         if (ldt != NULL) {
214                 type->typ_lu = ldt;
215                 rc = lu_device_type_init(ldt);
216                 if (rc != 0)
217                         GOTO (failed, rc);
218         }
219
220         spin_lock(&obd_types_lock);
221         list_add(&type->typ_chain, &obd_types);
222         spin_unlock(&obd_types_lock);
223
224         RETURN (0);
225
226 failed:
227         if (type->typ_name != NULL) {
228 #ifdef CONFIG_PROC_FS
229                 if (type->typ_procroot != NULL)
230                         remove_proc_subtree(type->typ_name, proc_lustre_root);
231 #endif
232                 OBD_FREE(type->typ_name, strlen(name) + 1);
233         }
234         if (type->typ_md_ops != NULL)
235                 OBD_FREE_PTR(type->typ_md_ops);
236         if (type->typ_dt_ops != NULL)
237                 OBD_FREE_PTR(type->typ_dt_ops);
238         OBD_FREE(type, sizeof(*type));
239         RETURN(rc);
240 }
241 EXPORT_SYMBOL(class_register_type);
242
243 int class_unregister_type(const char *name)
244 {
245         struct obd_type *type = class_search_type(name);
246         ENTRY;
247
248         if (!type) {
249                 CERROR("unknown obd type\n");
250                 RETURN(-EINVAL);
251         }
252
253         if (type->typ_refcnt) {
254                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
255                 /* This is a bad situation, let's make the best of it */
256                 /* Remove ops, but leave the name for debugging */
257                 OBD_FREE_PTR(type->typ_dt_ops);
258                 OBD_FREE_PTR(type->typ_md_ops);
259                 RETURN(-EBUSY);
260         }
261
262         /* we do not use type->typ_procroot as for compatibility purposes
263          * other modules can share names (i.e. lod can use lov entry). so
264          * we can't reference pointer as it can get invalided when another
265          * module removes the entry */
266 #ifdef CONFIG_PROC_FS
267         if (type->typ_procroot != NULL)
268                 remove_proc_subtree(type->typ_name, proc_lustre_root);
269         if (type->typ_procsym != NULL)
270                 lprocfs_remove(&type->typ_procsym);
271 #endif
272         if (type->typ_lu)
273                 lu_device_type_fini(type->typ_lu);
274
275         spin_lock(&obd_types_lock);
276         list_del(&type->typ_chain);
277         spin_unlock(&obd_types_lock);
278         OBD_FREE(type->typ_name, strlen(name) + 1);
279         if (type->typ_dt_ops != NULL)
280                 OBD_FREE_PTR(type->typ_dt_ops);
281         if (type->typ_md_ops != NULL)
282                 OBD_FREE_PTR(type->typ_md_ops);
283         OBD_FREE(type, sizeof(*type));
284         RETURN(0);
285 } /* class_unregister_type */
286 EXPORT_SYMBOL(class_unregister_type);
287
288 /**
289  * Create a new obd device.
290  *
291  * Find an empty slot in ::obd_devs[], create a new obd device in it.
292  *
293  * \param[in] type_name obd device type string.
294  * \param[in] name      obd device name.
295  *
296  * \retval NULL if create fails, otherwise return the obd device
297  *         pointer created.
298  */
299 struct obd_device *class_newdev(const char *type_name, const char *name)
300 {
301         struct obd_device *result = NULL;
302         struct obd_device *newdev;
303         struct obd_type *type = NULL;
304         int i;
305         int new_obd_minor = 0;
306         ENTRY;
307
308         if (strlen(name) >= MAX_OBD_NAME) {
309                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
310                 RETURN(ERR_PTR(-EINVAL));
311         }
312
313         type = class_get_type(type_name);
314         if (type == NULL){
315                 CERROR("OBD: unknown type: %s\n", type_name);
316                 RETURN(ERR_PTR(-ENODEV));
317         }
318
319         newdev = obd_device_alloc();
320         if (newdev == NULL)
321                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
322
323         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
324
325         write_lock(&obd_dev_lock);
326         for (i = 0; i < class_devno_max(); i++) {
327                 struct obd_device *obd = class_num2obd(i);
328
329                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
330                         CERROR("Device %s already exists at %d, won't add\n",
331                                name, i);
332                         if (result) {
333                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
334                                          "%p obd_magic %08x != %08x\n", result,
335                                          result->obd_magic, OBD_DEVICE_MAGIC);
336                                 LASSERTF(result->obd_minor == new_obd_minor,
337                                          "%p obd_minor %d != %d\n", result,
338                                          result->obd_minor, new_obd_minor);
339
340                                 obd_devs[result->obd_minor] = NULL;
341                                 result->obd_name[0]='\0';
342                          }
343                         result = ERR_PTR(-EEXIST);
344                         break;
345                 }
346                 if (!result && !obd) {
347                         result = newdev;
348                         result->obd_minor = i;
349                         new_obd_minor = i;
350                         result->obd_type = type;
351                         strncpy(result->obd_name, name,
352                                 sizeof(result->obd_name) - 1);
353                         obd_devs[i] = result;
354                 }
355         }
356         write_unlock(&obd_dev_lock);
357
358         if (result == NULL && i >= class_devno_max()) {
359                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
360                        class_devno_max());
361                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
362         }
363
364         if (IS_ERR(result))
365                 GOTO(out, result);
366
367         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
368                result->obd_name, result);
369
370         RETURN(result);
371 out:
372         obd_device_free(newdev);
373 out_type:
374         class_put_type(type);
375         return result;
376 }
377
378 void class_release_dev(struct obd_device *obd)
379 {
380         struct obd_type *obd_type = obd->obd_type;
381
382         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
383                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
384         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
385                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
386         LASSERT(obd_type != NULL);
387
388         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
389                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
390
391         write_lock(&obd_dev_lock);
392         obd_devs[obd->obd_minor] = NULL;
393         write_unlock(&obd_dev_lock);
394         obd_device_free(obd);
395
396         class_put_type(obd_type);
397 }
398
399 int class_name2dev(const char *name)
400 {
401         int i;
402
403         if (!name)
404                 return -1;
405
406         read_lock(&obd_dev_lock);
407         for (i = 0; i < class_devno_max(); i++) {
408                 struct obd_device *obd = class_num2obd(i);
409
410                 if (obd && strcmp(name, obd->obd_name) == 0) {
411                         /* Make sure we finished attaching before we give
412                            out any references */
413                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
414                         if (obd->obd_attached) {
415                                 read_unlock(&obd_dev_lock);
416                                 return i;
417                         }
418                         break;
419                 }
420         }
421         read_unlock(&obd_dev_lock);
422
423         return -1;
424 }
425
426 struct obd_device *class_name2obd(const char *name)
427 {
428         int dev = class_name2dev(name);
429
430         if (dev < 0 || dev > class_devno_max())
431                 return NULL;
432         return class_num2obd(dev);
433 }
434 EXPORT_SYMBOL(class_name2obd);
435
436 int class_uuid2dev(struct obd_uuid *uuid)
437 {
438         int i;
439
440         read_lock(&obd_dev_lock);
441         for (i = 0; i < class_devno_max(); i++) {
442                 struct obd_device *obd = class_num2obd(i);
443
444                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
445                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
446                         read_unlock(&obd_dev_lock);
447                         return i;
448                 }
449         }
450         read_unlock(&obd_dev_lock);
451
452         return -1;
453 }
454
455 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
456 {
457         int dev = class_uuid2dev(uuid);
458         if (dev < 0)
459                 return NULL;
460         return class_num2obd(dev);
461 }
462 EXPORT_SYMBOL(class_uuid2obd);
463
464 /**
465  * Get obd device from ::obd_devs[]
466  *
467  * \param num [in] array index
468  *
469  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
470  *         otherwise return the obd device there.
471  */
472 struct obd_device *class_num2obd(int num)
473 {
474         struct obd_device *obd = NULL;
475
476         if (num < class_devno_max()) {
477                 obd = obd_devs[num];
478                 if (obd == NULL)
479                         return NULL;
480
481                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
482                          "%p obd_magic %08x != %08x\n",
483                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
484                 LASSERTF(obd->obd_minor == num,
485                          "%p obd_minor %0d != %0d\n",
486                          obd, obd->obd_minor, num);
487         }
488
489         return obd;
490 }
491
492 /**
493  * Get obd devices count. Device in any
494  *    state are counted
495  * \retval obd device count
496  */
497 int get_devices_count(void)
498 {
499         int index, max_index = class_devno_max(), dev_count = 0;
500
501         read_lock(&obd_dev_lock);
502         for (index = 0; index <= max_index; index++) {
503                 struct obd_device *obd = class_num2obd(index);
504                 if (obd != NULL)
505                         dev_count++;
506         }
507         read_unlock(&obd_dev_lock);
508
509         return dev_count;
510 }
511 EXPORT_SYMBOL(get_devices_count);
512
513 void class_obd_list(void)
514 {
515         char *status;
516         int i;
517
518         read_lock(&obd_dev_lock);
519         for (i = 0; i < class_devno_max(); i++) {
520                 struct obd_device *obd = class_num2obd(i);
521
522                 if (obd == NULL)
523                         continue;
524                 if (obd->obd_stopping)
525                         status = "ST";
526                 else if (obd->obd_set_up)
527                         status = "UP";
528                 else if (obd->obd_attached)
529                         status = "AT";
530                 else
531                         status = "--";
532                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
533                          i, status, obd->obd_type->typ_name,
534                          obd->obd_name, obd->obd_uuid.uuid,
535                          atomic_read(&obd->obd_refcount));
536         }
537         read_unlock(&obd_dev_lock);
538         return;
539 }
540
541 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
542    specified, then only the client with that uuid is returned,
543    otherwise any client connected to the tgt is returned. */
544 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
545                                           const char * typ_name,
546                                           struct obd_uuid *grp_uuid)
547 {
548         int i;
549
550         read_lock(&obd_dev_lock);
551         for (i = 0; i < class_devno_max(); i++) {
552                 struct obd_device *obd = class_num2obd(i);
553
554                 if (obd == NULL)
555                         continue;
556                 if ((strncmp(obd->obd_type->typ_name, typ_name,
557                              strlen(typ_name)) == 0)) {
558                         if (obd_uuid_equals(tgt_uuid,
559                                             &obd->u.cli.cl_target_uuid) &&
560                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
561                                                          &obd->obd_uuid) : 1)) {
562                                 read_unlock(&obd_dev_lock);
563                                 return obd;
564                         }
565                 }
566         }
567         read_unlock(&obd_dev_lock);
568
569         return NULL;
570 }
571 EXPORT_SYMBOL(class_find_client_obd);
572
573 /* Iterate the obd_device list looking devices have grp_uuid. Start
574    searching at *next, and if a device is found, the next index to look
575    at is saved in *next. If next is NULL, then the first matching device
576    will always be returned. */
577 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
578 {
579         int i;
580
581         if (next == NULL)
582                 i = 0;
583         else if (*next >= 0 && *next < class_devno_max())
584                 i = *next;
585         else
586                 return NULL;
587
588         read_lock(&obd_dev_lock);
589         for (; i < class_devno_max(); i++) {
590                 struct obd_device *obd = class_num2obd(i);
591
592                 if (obd == NULL)
593                         continue;
594                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
595                         if (next != NULL)
596                                 *next = i+1;
597                         read_unlock(&obd_dev_lock);
598                         return obd;
599                 }
600         }
601         read_unlock(&obd_dev_lock);
602
603         return NULL;
604 }
605 EXPORT_SYMBOL(class_devices_in_group);
606
607 /**
608  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
609  * adjust sptlrpc settings accordingly.
610  */
611 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
612 {
613         struct obd_device  *obd;
614         const char         *type;
615         int                 i, rc = 0, rc2;
616
617         LASSERT(namelen > 0);
618
619         read_lock(&obd_dev_lock);
620         for (i = 0; i < class_devno_max(); i++) {
621                 obd = class_num2obd(i);
622
623                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
624                         continue;
625
626                 /* only notify mdc, osc, osp, lwp, mdt, ost
627                  * because only these have a -sptlrpc llog */
628                 type = obd->obd_type->typ_name;
629                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
630                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
631                     strcmp(type, LUSTRE_OSP_NAME) != 0 &&
632                     strcmp(type, LUSTRE_LWP_NAME) != 0 &&
633                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
634                     strcmp(type, LUSTRE_OST_NAME) != 0)
635                         continue;
636
637                 if (strncmp(obd->obd_name, fsname, namelen))
638                         continue;
639
640                 class_incref(obd, __FUNCTION__, obd);
641                 read_unlock(&obd_dev_lock);
642                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
643                                          sizeof(KEY_SPTLRPC_CONF),
644                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
645                 rc = rc ? rc : rc2;
646                 class_decref(obd, __FUNCTION__, obd);
647                 read_lock(&obd_dev_lock);
648         }
649         read_unlock(&obd_dev_lock);
650         return rc;
651 }
652 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
653
654 void obd_cleanup_caches(void)
655 {
656         ENTRY;
657         if (obd_device_cachep) {
658                 kmem_cache_destroy(obd_device_cachep);
659                 obd_device_cachep = NULL;
660         }
661         if (obdo_cachep) {
662                 kmem_cache_destroy(obdo_cachep);
663                 obdo_cachep = NULL;
664         }
665         if (import_cachep) {
666                 kmem_cache_destroy(import_cachep);
667                 import_cachep = NULL;
668         }
669
670         EXIT;
671 }
672
673 int obd_init_caches(void)
674 {
675         int rc;
676         ENTRY;
677
678         LASSERT(obd_device_cachep == NULL);
679         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
680                                               sizeof(struct obd_device),
681                                               0, 0, NULL);
682         if (!obd_device_cachep)
683                 GOTO(out, rc = -ENOMEM);
684
685         LASSERT(obdo_cachep == NULL);
686         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
687                                         0, 0, NULL);
688         if (!obdo_cachep)
689                 GOTO(out, rc = -ENOMEM);
690
691         LASSERT(import_cachep == NULL);
692         import_cachep = kmem_cache_create("ll_import_cache",
693                                           sizeof(struct obd_import),
694                                           0, 0, NULL);
695         if (!import_cachep)
696                 GOTO(out, rc = -ENOMEM);
697
698         RETURN(0);
699 out:
700         obd_cleanup_caches();
701         RETURN(rc);
702 }
703
704 /* map connection to client */
705 struct obd_export *class_conn2export(struct lustre_handle *conn)
706 {
707         struct obd_export *export;
708         ENTRY;
709
710         if (!conn) {
711                 CDEBUG(D_CACHE, "looking for null handle\n");
712                 RETURN(NULL);
713         }
714
715         if (conn->cookie == -1) {  /* this means assign a new connection */
716                 CDEBUG(D_CACHE, "want a new connection\n");
717                 RETURN(NULL);
718         }
719
720         CDEBUG(D_INFO, "looking for export cookie %#llx\n", conn->cookie);
721         export = class_handle2object(conn->cookie, NULL);
722         RETURN(export);
723 }
724 EXPORT_SYMBOL(class_conn2export);
725
726 struct obd_device *class_exp2obd(struct obd_export *exp)
727 {
728         if (exp)
729                 return exp->exp_obd;
730         return NULL;
731 }
732 EXPORT_SYMBOL(class_exp2obd);
733
734 struct obd_device *class_conn2obd(struct lustre_handle *conn)
735 {
736         struct obd_export *export;
737         export = class_conn2export(conn);
738         if (export) {
739                 struct obd_device *obd = export->exp_obd;
740                 class_export_put(export);
741                 return obd;
742         }
743         return NULL;
744 }
745
746 struct obd_import *class_exp2cliimp(struct obd_export *exp)
747 {
748         struct obd_device *obd = exp->exp_obd;
749         if (obd == NULL)
750                 return NULL;
751         return obd->u.cli.cl_import;
752 }
753 EXPORT_SYMBOL(class_exp2cliimp);
754
755 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
756 {
757         struct obd_device *obd = class_conn2obd(conn);
758         if (obd == NULL)
759                 return NULL;
760         return obd->u.cli.cl_import;
761 }
762
763 /* Export management functions */
764 static void class_export_destroy(struct obd_export *exp)
765 {
766         struct obd_device *obd = exp->exp_obd;
767         ENTRY;
768
769         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
770         LASSERT(obd != NULL);
771
772         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
773                exp->exp_client_uuid.uuid, obd->obd_name);
774
775         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
776         if (exp->exp_connection)
777                 ptlrpc_put_connection_superhack(exp->exp_connection);
778
779         LASSERT(list_empty(&exp->exp_outstanding_replies));
780         LASSERT(list_empty(&exp->exp_uncommitted_replies));
781         LASSERT(list_empty(&exp->exp_req_replay_queue));
782         LASSERT(list_empty(&exp->exp_hp_rpcs));
783         obd_destroy_export(exp);
784         class_decref(obd, "export", exp);
785
786         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
787         EXIT;
788 }
789
790 static void export_handle_addref(void *export)
791 {
792         class_export_get(export);
793 }
794
795 static struct portals_handle_ops export_handle_ops = {
796         .hop_addref = export_handle_addref,
797         .hop_free   = NULL,
798 };
799
800 struct obd_export *class_export_get(struct obd_export *exp)
801 {
802         atomic_inc(&exp->exp_refcount);
803         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
804                atomic_read(&exp->exp_refcount));
805         return exp;
806 }
807 EXPORT_SYMBOL(class_export_get);
808
809 void class_export_put(struct obd_export *exp)
810 {
811         LASSERT(exp != NULL);
812         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
813         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
814                atomic_read(&exp->exp_refcount) - 1);
815
816         if (atomic_dec_and_test(&exp->exp_refcount)) {
817                 LASSERT(!list_empty(&exp->exp_obd_chain));
818                 LASSERT(list_empty(&exp->exp_stale_list));
819                 CDEBUG(D_IOCTL, "final put %p/%s\n",
820                        exp, exp->exp_client_uuid.uuid);
821
822                 /* release nid stat refererence */
823                 lprocfs_exp_cleanup(exp);
824
825                 obd_zombie_export_add(exp);
826         }
827 }
828 EXPORT_SYMBOL(class_export_put);
829
830 /* Creates a new export, adds it to the hash table, and returns a
831  * pointer to it. The refcount is 2: one for the hash reference, and
832  * one for the pointer returned by this function. */
833 struct obd_export *class_new_export(struct obd_device *obd,
834                                     struct obd_uuid *cluuid)
835 {
836         struct obd_export *export;
837         struct cfs_hash *hash = NULL;
838         int rc = 0;
839         ENTRY;
840
841         OBD_ALLOC_PTR(export);
842         if (!export)
843                 return ERR_PTR(-ENOMEM);
844
845         export->exp_conn_cnt = 0;
846         export->exp_lock_hash = NULL;
847         export->exp_flock_hash = NULL;
848         atomic_set(&export->exp_refcount, 2);
849         atomic_set(&export->exp_rpc_count, 0);
850         atomic_set(&export->exp_cb_count, 0);
851         atomic_set(&export->exp_locks_count, 0);
852 #if LUSTRE_TRACKS_LOCK_EXP_REFS
853         INIT_LIST_HEAD(&export->exp_locks_list);
854         spin_lock_init(&export->exp_locks_list_guard);
855 #endif
856         atomic_set(&export->exp_replay_count, 0);
857         export->exp_obd = obd;
858         INIT_LIST_HEAD(&export->exp_outstanding_replies);
859         spin_lock_init(&export->exp_uncommitted_replies_lock);
860         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
861         INIT_LIST_HEAD(&export->exp_req_replay_queue);
862         INIT_LIST_HEAD(&export->exp_handle.h_link);
863         INIT_LIST_HEAD(&export->exp_hp_rpcs);
864         INIT_LIST_HEAD(&export->exp_reg_rpcs);
865         class_handle_hash(&export->exp_handle, &export_handle_ops);
866         export->exp_last_request_time = cfs_time_current_sec();
867         spin_lock_init(&export->exp_lock);
868         spin_lock_init(&export->exp_rpc_lock);
869         INIT_HLIST_NODE(&export->exp_uuid_hash);
870         INIT_HLIST_NODE(&export->exp_nid_hash);
871         INIT_HLIST_NODE(&export->exp_gen_hash);
872         spin_lock_init(&export->exp_bl_list_lock);
873         INIT_LIST_HEAD(&export->exp_bl_list);
874         INIT_LIST_HEAD(&export->exp_stale_list);
875
876         export->exp_sp_peer = LUSTRE_SP_ANY;
877         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
878         export->exp_client_uuid = *cluuid;
879         obd_init_export(export);
880
881         spin_lock(&obd->obd_dev_lock);
882         /* shouldn't happen, but might race */
883         if (obd->obd_stopping)
884                 GOTO(exit_unlock, rc = -ENODEV);
885
886         hash = cfs_hash_getref(obd->obd_uuid_hash);
887         if (hash == NULL)
888                 GOTO(exit_unlock, rc = -ENODEV);
889         spin_unlock(&obd->obd_dev_lock);
890
891         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
892                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
893                 if (rc != 0) {
894                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
895                                       obd->obd_name, cluuid->uuid, rc);
896                         GOTO(exit_err, rc = -EALREADY);
897                 }
898         }
899
900         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
901         spin_lock(&obd->obd_dev_lock);
902         if (obd->obd_stopping) {
903                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
904                 GOTO(exit_unlock, rc = -ENODEV);
905         }
906
907         class_incref(obd, "export", export);
908         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
909         list_add_tail(&export->exp_obd_chain_timed,
910                       &export->exp_obd->obd_exports_timed);
911         export->exp_obd->obd_num_exports++;
912         spin_unlock(&obd->obd_dev_lock);
913         cfs_hash_putref(hash);
914         RETURN(export);
915
916 exit_unlock:
917         spin_unlock(&obd->obd_dev_lock);
918 exit_err:
919         if (hash)
920                 cfs_hash_putref(hash);
921         class_handle_unhash(&export->exp_handle);
922         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
923         obd_destroy_export(export);
924         OBD_FREE_PTR(export);
925         return ERR_PTR(rc);
926 }
927 EXPORT_SYMBOL(class_new_export);
928
929 void class_unlink_export(struct obd_export *exp)
930 {
931         class_handle_unhash(&exp->exp_handle);
932
933         spin_lock(&exp->exp_obd->obd_dev_lock);
934         /* delete an uuid-export hashitem from hashtables */
935         if (!hlist_unhashed(&exp->exp_uuid_hash))
936                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
937                              &exp->exp_client_uuid,
938                              &exp->exp_uuid_hash);
939
940         if (!hlist_unhashed(&exp->exp_gen_hash)) {
941                 struct tg_export_data   *ted = &exp->exp_target_data;
942                 struct cfs_hash         *hash;
943
944                 /* Because obd_gen_hash will not be released until
945                  * class_cleanup(), so hash should never be NULL here */
946                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
947                 LASSERT(hash != NULL);
948                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
949                              &exp->exp_gen_hash);
950                 cfs_hash_putref(hash);
951         }
952
953         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
954         list_del_init(&exp->exp_obd_chain_timed);
955         exp->exp_obd->obd_num_exports--;
956         spin_unlock(&exp->exp_obd->obd_dev_lock);
957         atomic_inc(&obd_stale_export_num);
958
959         /* A reference is kept by obd_stale_exports list */
960         obd_stale_export_put(exp);
961 }
962 EXPORT_SYMBOL(class_unlink_export);
963
964 /* Import management functions */
965 static void class_import_destroy(struct obd_import *imp)
966 {
967         ENTRY;
968
969         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
970                 imp->imp_obd->obd_name);
971
972         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
973
974         ptlrpc_put_connection_superhack(imp->imp_connection);
975
976         while (!list_empty(&imp->imp_conn_list)) {
977                 struct obd_import_conn *imp_conn;
978
979                 imp_conn = list_entry(imp->imp_conn_list.next,
980                                       struct obd_import_conn, oic_item);
981                 list_del_init(&imp_conn->oic_item);
982                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
983                 OBD_FREE(imp_conn, sizeof(*imp_conn));
984         }
985
986         LASSERT(imp->imp_sec == NULL);
987         class_decref(imp->imp_obd, "import", imp);
988         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
989         EXIT;
990 }
991
992 static void import_handle_addref(void *import)
993 {
994         class_import_get(import);
995 }
996
997 static struct portals_handle_ops import_handle_ops = {
998         .hop_addref = import_handle_addref,
999         .hop_free   = NULL,
1000 };
1001
1002 struct obd_import *class_import_get(struct obd_import *import)
1003 {
1004         atomic_inc(&import->imp_refcount);
1005         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1006                atomic_read(&import->imp_refcount),
1007                import->imp_obd->obd_name);
1008         return import;
1009 }
1010 EXPORT_SYMBOL(class_import_get);
1011
1012 void class_import_put(struct obd_import *imp)
1013 {
1014         ENTRY;
1015
1016         LASSERT(list_empty(&imp->imp_zombie_chain));
1017         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1018
1019         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1020                atomic_read(&imp->imp_refcount) - 1,
1021                imp->imp_obd->obd_name);
1022
1023         if (atomic_dec_and_test(&imp->imp_refcount)) {
1024                 CDEBUG(D_INFO, "final put import %p\n", imp);
1025                 obd_zombie_import_add(imp);
1026         }
1027
1028         /* catch possible import put race */
1029         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1030         EXIT;
1031 }
1032 EXPORT_SYMBOL(class_import_put);
1033
1034 static void init_imp_at(struct imp_at *at) {
1035         int i;
1036         at_init(&at->iat_net_latency, 0, 0);
1037         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1038                 /* max service estimates are tracked on the server side, so
1039                    don't use the AT history here, just use the last reported
1040                    val. (But keep hist for proc histogram, worst_ever) */
1041                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1042                         AT_FLG_NOHIST);
1043         }
1044 }
1045
1046 struct obd_import *class_new_import(struct obd_device *obd)
1047 {
1048         struct obd_import *imp;
1049         struct pid_namespace *curr_pid_ns = ll_task_pid_ns(current);
1050
1051         OBD_ALLOC(imp, sizeof(*imp));
1052         if (imp == NULL)
1053                 return NULL;
1054
1055         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1056         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1057         INIT_LIST_HEAD(&imp->imp_replay_list);
1058         INIT_LIST_HEAD(&imp->imp_sending_list);
1059         INIT_LIST_HEAD(&imp->imp_delayed_list);
1060         INIT_LIST_HEAD(&imp->imp_committed_list);
1061         INIT_LIST_HEAD(&imp->imp_unreplied_list);
1062         imp->imp_known_replied_xid = 0;
1063         imp->imp_replay_cursor = &imp->imp_committed_list;
1064         spin_lock_init(&imp->imp_lock);
1065         imp->imp_last_success_conn = 0;
1066         imp->imp_state = LUSTRE_IMP_NEW;
1067         imp->imp_obd = class_incref(obd, "import", imp);
1068         mutex_init(&imp->imp_sec_mutex);
1069         init_waitqueue_head(&imp->imp_recovery_waitq);
1070
1071         if (curr_pid_ns->child_reaper)
1072                 imp->imp_sec_refpid = curr_pid_ns->child_reaper->pid;
1073         else
1074                 imp->imp_sec_refpid = 1;
1075
1076         atomic_set(&imp->imp_refcount, 2);
1077         atomic_set(&imp->imp_unregistering, 0);
1078         atomic_set(&imp->imp_inflight, 0);
1079         atomic_set(&imp->imp_replay_inflight, 0);
1080         atomic_set(&imp->imp_inval_count, 0);
1081         INIT_LIST_HEAD(&imp->imp_conn_list);
1082         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1083         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1084         init_imp_at(&imp->imp_at);
1085
1086         /* the default magic is V2, will be used in connect RPC, and
1087          * then adjusted according to the flags in request/reply. */
1088         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1089
1090         return imp;
1091 }
1092 EXPORT_SYMBOL(class_new_import);
1093
1094 void class_destroy_import(struct obd_import *import)
1095 {
1096         LASSERT(import != NULL);
1097         LASSERT(import != LP_POISON);
1098
1099         class_handle_unhash(&import->imp_handle);
1100
1101         spin_lock(&import->imp_lock);
1102         import->imp_generation++;
1103         spin_unlock(&import->imp_lock);
1104         class_import_put(import);
1105 }
1106 EXPORT_SYMBOL(class_destroy_import);
1107
1108 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1109
1110 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1111 {
1112         spin_lock(&exp->exp_locks_list_guard);
1113
1114         LASSERT(lock->l_exp_refs_nr >= 0);
1115
1116         if (lock->l_exp_refs_target != NULL &&
1117             lock->l_exp_refs_target != exp) {
1118                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1119                               exp, lock, lock->l_exp_refs_target);
1120         }
1121         if ((lock->l_exp_refs_nr ++) == 0) {
1122                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1123                 lock->l_exp_refs_target = exp;
1124         }
1125         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1126                lock, exp, lock->l_exp_refs_nr);
1127         spin_unlock(&exp->exp_locks_list_guard);
1128 }
1129 EXPORT_SYMBOL(__class_export_add_lock_ref);
1130
1131 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1132 {
1133         spin_lock(&exp->exp_locks_list_guard);
1134         LASSERT(lock->l_exp_refs_nr > 0);
1135         if (lock->l_exp_refs_target != exp) {
1136                 LCONSOLE_WARN("lock %p, "
1137                               "mismatching export pointers: %p, %p\n",
1138                               lock, lock->l_exp_refs_target, exp);
1139         }
1140         if (-- lock->l_exp_refs_nr == 0) {
1141                 list_del_init(&lock->l_exp_refs_link);
1142                 lock->l_exp_refs_target = NULL;
1143         }
1144         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1145                lock, exp, lock->l_exp_refs_nr);
1146         spin_unlock(&exp->exp_locks_list_guard);
1147 }
1148 EXPORT_SYMBOL(__class_export_del_lock_ref);
1149 #endif
1150
1151 /* A connection defines an export context in which preallocation can
1152    be managed. This releases the export pointer reference, and returns
1153    the export handle, so the export refcount is 1 when this function
1154    returns. */
1155 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1156                   struct obd_uuid *cluuid)
1157 {
1158         struct obd_export *export;
1159         LASSERT(conn != NULL);
1160         LASSERT(obd != NULL);
1161         LASSERT(cluuid != NULL);
1162         ENTRY;
1163
1164         export = class_new_export(obd, cluuid);
1165         if (IS_ERR(export))
1166                 RETURN(PTR_ERR(export));
1167
1168         conn->cookie = export->exp_handle.h_cookie;
1169         class_export_put(export);
1170
1171         CDEBUG(D_IOCTL, "connect: client %s, cookie %#llx\n",
1172                cluuid->uuid, conn->cookie);
1173         RETURN(0);
1174 }
1175 EXPORT_SYMBOL(class_connect);
1176
1177 /* if export is involved in recovery then clean up related things */
1178 static void class_export_recovery_cleanup(struct obd_export *exp)
1179 {
1180         struct obd_device *obd = exp->exp_obd;
1181
1182         spin_lock(&obd->obd_recovery_task_lock);
1183         if (obd->obd_recovering) {
1184                 if (exp->exp_in_recovery) {
1185                         spin_lock(&exp->exp_lock);
1186                         exp->exp_in_recovery = 0;
1187                         spin_unlock(&exp->exp_lock);
1188                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1189                         atomic_dec(&obd->obd_connected_clients);
1190                 }
1191
1192                 /* if called during recovery then should update
1193                  * obd_stale_clients counter,
1194                  * lightweight exports are not counted */
1195                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1196                         exp->exp_obd->obd_stale_clients++;
1197         }
1198         spin_unlock(&obd->obd_recovery_task_lock);
1199
1200         spin_lock(&exp->exp_lock);
1201         /** Cleanup req replay fields */
1202         if (exp->exp_req_replay_needed) {
1203                 exp->exp_req_replay_needed = 0;
1204
1205                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1206                 atomic_dec(&obd->obd_req_replay_clients);
1207         }
1208
1209         /** Cleanup lock replay data */
1210         if (exp->exp_lock_replay_needed) {
1211                 exp->exp_lock_replay_needed = 0;
1212
1213                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1214                 atomic_dec(&obd->obd_lock_replay_clients);
1215         }
1216         spin_unlock(&exp->exp_lock);
1217 }
1218
1219 /* This function removes 1-3 references from the export:
1220  * 1 - for export pointer passed
1221  * and if disconnect really need
1222  * 2 - removing from hash
1223  * 3 - in client_unlink_export
1224  * The export pointer passed to this function can destroyed */
1225 int class_disconnect(struct obd_export *export)
1226 {
1227         int already_disconnected;
1228         ENTRY;
1229
1230         if (export == NULL) {
1231                 CWARN("attempting to free NULL export %p\n", export);
1232                 RETURN(-EINVAL);
1233         }
1234
1235         spin_lock(&export->exp_lock);
1236         already_disconnected = export->exp_disconnected;
1237         export->exp_disconnected = 1;
1238         spin_unlock(&export->exp_lock);
1239
1240         /* class_cleanup(), abort_recovery(), and class_fail_export()
1241          * all end up in here, and if any of them race we shouldn't
1242          * call extra class_export_puts(). */
1243         if (already_disconnected) {
1244                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1245                 GOTO(no_disconn, already_disconnected);
1246         }
1247
1248         CDEBUG(D_IOCTL, "disconnect: cookie %#llx\n",
1249                export->exp_handle.h_cookie);
1250
1251         if (!hlist_unhashed(&export->exp_nid_hash))
1252                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1253                              &export->exp_connection->c_peer.nid,
1254                              &export->exp_nid_hash);
1255
1256         class_export_recovery_cleanup(export);
1257         class_unlink_export(export);
1258 no_disconn:
1259         class_export_put(export);
1260         RETURN(0);
1261 }
1262 EXPORT_SYMBOL(class_disconnect);
1263
1264 /* Return non-zero for a fully connected export */
1265 int class_connected_export(struct obd_export *exp)
1266 {
1267         int connected = 0;
1268
1269         if (exp) {
1270                 spin_lock(&exp->exp_lock);
1271                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1272                 spin_unlock(&exp->exp_lock);
1273         }
1274         return connected;
1275 }
1276 EXPORT_SYMBOL(class_connected_export);
1277
1278 static void class_disconnect_export_list(struct list_head *list,
1279                                          enum obd_option flags)
1280 {
1281         int rc;
1282         struct obd_export *exp;
1283         ENTRY;
1284
1285         /* It's possible that an export may disconnect itself, but
1286          * nothing else will be added to this list. */
1287         while (!list_empty(list)) {
1288                 exp = list_entry(list->next, struct obd_export,
1289                                  exp_obd_chain);
1290                 /* need for safe call CDEBUG after obd_disconnect */
1291                 class_export_get(exp);
1292
1293                 spin_lock(&exp->exp_lock);
1294                 exp->exp_flags = flags;
1295                 spin_unlock(&exp->exp_lock);
1296
1297                 if (obd_uuid_equals(&exp->exp_client_uuid,
1298                                     &exp->exp_obd->obd_uuid)) {
1299                         CDEBUG(D_HA,
1300                                "exp %p export uuid == obd uuid, don't discon\n",
1301                                exp);
1302                         /* Need to delete this now so we don't end up pointing
1303                          * to work_list later when this export is cleaned up. */
1304                         list_del_init(&exp->exp_obd_chain);
1305                         class_export_put(exp);
1306                         continue;
1307                 }
1308
1309                 class_export_get(exp);
1310                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1311                        "last request at "CFS_TIME_T"\n",
1312                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1313                        exp, exp->exp_last_request_time);
1314                 /* release one export reference anyway */
1315                 rc = obd_disconnect(exp);
1316
1317                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1318                        obd_export_nid2str(exp), exp, rc);
1319                 class_export_put(exp);
1320         }
1321         EXIT;
1322 }
1323
1324 void class_disconnect_exports(struct obd_device *obd)
1325 {
1326         struct list_head work_list;
1327         ENTRY;
1328
1329         /* Move all of the exports from obd_exports to a work list, en masse. */
1330         INIT_LIST_HEAD(&work_list);
1331         spin_lock(&obd->obd_dev_lock);
1332         list_splice_init(&obd->obd_exports, &work_list);
1333         list_splice_init(&obd->obd_delayed_exports, &work_list);
1334         spin_unlock(&obd->obd_dev_lock);
1335
1336         if (!list_empty(&work_list)) {
1337                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1338                        "disconnecting them\n", obd->obd_minor, obd);
1339                 class_disconnect_export_list(&work_list,
1340                                              exp_flags_from_obd(obd));
1341         } else
1342                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1343                        obd->obd_minor, obd);
1344         EXIT;
1345 }
1346 EXPORT_SYMBOL(class_disconnect_exports);
1347
1348 /* Remove exports that have not completed recovery.
1349  */
1350 void class_disconnect_stale_exports(struct obd_device *obd,
1351                                     int (*test_export)(struct obd_export *))
1352 {
1353         struct list_head work_list;
1354         struct obd_export *exp, *n;
1355         int evicted = 0;
1356         ENTRY;
1357
1358         INIT_LIST_HEAD(&work_list);
1359         spin_lock(&obd->obd_dev_lock);
1360         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1361                                  exp_obd_chain) {
1362                 /* don't count self-export as client */
1363                 if (obd_uuid_equals(&exp->exp_client_uuid,
1364                                     &exp->exp_obd->obd_uuid))
1365                         continue;
1366
1367                 /* don't evict clients which have no slot in last_rcvd
1368                  * (e.g. lightweight connection) */
1369                 if (exp->exp_target_data.ted_lr_idx == -1)
1370                         continue;
1371
1372                 spin_lock(&exp->exp_lock);
1373                 if (exp->exp_failed || test_export(exp)) {
1374                         spin_unlock(&exp->exp_lock);
1375                         continue;
1376                 }
1377                 exp->exp_failed = 1;
1378                 spin_unlock(&exp->exp_lock);
1379
1380                 list_move(&exp->exp_obd_chain, &work_list);
1381                 evicted++;
1382                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1383                        obd->obd_name, exp->exp_client_uuid.uuid,
1384                        exp->exp_connection == NULL ? "<unknown>" :
1385                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1386                 print_export_data(exp, "EVICTING", 0, D_HA);
1387         }
1388         spin_unlock(&obd->obd_dev_lock);
1389
1390         if (evicted)
1391                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1392                               obd->obd_name, evicted);
1393
1394         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1395                                                  OBD_OPT_ABORT_RECOV);
1396         EXIT;
1397 }
1398 EXPORT_SYMBOL(class_disconnect_stale_exports);
1399
1400 void class_fail_export(struct obd_export *exp)
1401 {
1402         int rc, already_failed;
1403
1404         spin_lock(&exp->exp_lock);
1405         already_failed = exp->exp_failed;
1406         exp->exp_failed = 1;
1407         spin_unlock(&exp->exp_lock);
1408
1409         if (already_failed) {
1410                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1411                        exp, exp->exp_client_uuid.uuid);
1412                 return;
1413         }
1414
1415         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1416                exp, exp->exp_client_uuid.uuid);
1417
1418         if (obd_dump_on_timeout)
1419                 libcfs_debug_dumplog();
1420
1421         /* need for safe call CDEBUG after obd_disconnect */
1422         class_export_get(exp);
1423
1424         /* Most callers into obd_disconnect are removing their own reference
1425          * (request, for example) in addition to the one from the hash table.
1426          * We don't have such a reference here, so make one. */
1427         class_export_get(exp);
1428         rc = obd_disconnect(exp);
1429         if (rc)
1430                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1431         else
1432                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1433                        exp, exp->exp_client_uuid.uuid);
1434         class_export_put(exp);
1435 }
1436 EXPORT_SYMBOL(class_fail_export);
1437
1438 char *obd_export_nid2str(struct obd_export *exp)
1439 {
1440         if (exp->exp_connection != NULL)
1441                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1442
1443         return "(no nid)";
1444 }
1445 EXPORT_SYMBOL(obd_export_nid2str);
1446
1447 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1448 {
1449         struct cfs_hash *nid_hash;
1450         struct obd_export *doomed_exp = NULL;
1451         int exports_evicted = 0;
1452
1453         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1454
1455         spin_lock(&obd->obd_dev_lock);
1456         /* umount has run already, so evict thread should leave
1457          * its task to umount thread now */
1458         if (obd->obd_stopping) {
1459                 spin_unlock(&obd->obd_dev_lock);
1460                 return exports_evicted;
1461         }
1462         nid_hash = obd->obd_nid_hash;
1463         cfs_hash_getref(nid_hash);
1464         spin_unlock(&obd->obd_dev_lock);
1465
1466         do {
1467                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1468                 if (doomed_exp == NULL)
1469                         break;
1470
1471                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1472                          "nid %s found, wanted nid %s, requested nid %s\n",
1473                          obd_export_nid2str(doomed_exp),
1474                          libcfs_nid2str(nid_key), nid);
1475                 LASSERTF(doomed_exp != obd->obd_self_export,
1476                          "self-export is hashed by NID?\n");
1477                 exports_evicted++;
1478                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1479                               "request\n", obd->obd_name,
1480                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1481                               obd_export_nid2str(doomed_exp));
1482                 class_fail_export(doomed_exp);
1483                 class_export_put(doomed_exp);
1484         } while (1);
1485
1486         cfs_hash_putref(nid_hash);
1487
1488         if (!exports_evicted)
1489                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1490                        obd->obd_name, nid);
1491         return exports_evicted;
1492 }
1493 EXPORT_SYMBOL(obd_export_evict_by_nid);
1494
1495 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1496 {
1497         struct cfs_hash *uuid_hash;
1498         struct obd_export *doomed_exp = NULL;
1499         struct obd_uuid doomed_uuid;
1500         int exports_evicted = 0;
1501
1502         spin_lock(&obd->obd_dev_lock);
1503         if (obd->obd_stopping) {
1504                 spin_unlock(&obd->obd_dev_lock);
1505                 return exports_evicted;
1506         }
1507         uuid_hash = obd->obd_uuid_hash;
1508         cfs_hash_getref(uuid_hash);
1509         spin_unlock(&obd->obd_dev_lock);
1510
1511         obd_str2uuid(&doomed_uuid, uuid);
1512         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1513                 CERROR("%s: can't evict myself\n", obd->obd_name);
1514                 cfs_hash_putref(uuid_hash);
1515                 return exports_evicted;
1516         }
1517
1518         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1519
1520         if (doomed_exp == NULL) {
1521                 CERROR("%s: can't disconnect %s: no exports found\n",
1522                        obd->obd_name, uuid);
1523         } else {
1524                 CWARN("%s: evicting %s at adminstrative request\n",
1525                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1526                 class_fail_export(doomed_exp);
1527                 class_export_put(doomed_exp);
1528                 exports_evicted++;
1529         }
1530         cfs_hash_putref(uuid_hash);
1531
1532         return exports_evicted;
1533 }
1534
1535 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1536 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1537 EXPORT_SYMBOL(class_export_dump_hook);
1538 #endif
1539
1540 static void print_export_data(struct obd_export *exp, const char *status,
1541                               int locks, int debug_level)
1542 {
1543         struct ptlrpc_reply_state *rs;
1544         struct ptlrpc_reply_state *first_reply = NULL;
1545         int nreplies = 0;
1546
1547         spin_lock(&exp->exp_lock);
1548         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1549                             rs_exp_list) {
1550                 if (nreplies == 0)
1551                         first_reply = rs;
1552                 nreplies++;
1553         }
1554         spin_unlock(&exp->exp_lock);
1555
1556         CDEBUG(debug_level, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: "
1557                "%p %s %llu stale:%d\n",
1558                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1559                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1560                atomic_read(&exp->exp_rpc_count),
1561                atomic_read(&exp->exp_cb_count),
1562                atomic_read(&exp->exp_locks_count),
1563                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1564                nreplies, first_reply, nreplies > 3 ? "..." : "",
1565                exp->exp_last_committed, !list_empty(&exp->exp_stale_list));
1566 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1567         if (locks && class_export_dump_hook != NULL)
1568                 class_export_dump_hook(exp);
1569 #endif
1570 }
1571
1572 void dump_exports(struct obd_device *obd, int locks, int debug_level)
1573 {
1574         struct obd_export *exp;
1575
1576         spin_lock(&obd->obd_dev_lock);
1577         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1578                 print_export_data(exp, "ACTIVE", locks, debug_level);
1579         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1580                 print_export_data(exp, "UNLINKED", locks, debug_level);
1581         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1582                 print_export_data(exp, "DELAYED", locks, debug_level);
1583         spin_unlock(&obd->obd_dev_lock);
1584         spin_lock(&obd_zombie_impexp_lock);
1585         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1586                 print_export_data(exp, "ZOMBIE", locks, debug_level);
1587         spin_unlock(&obd_zombie_impexp_lock);
1588 }
1589
1590 void obd_exports_barrier(struct obd_device *obd)
1591 {
1592         int waited = 2;
1593         LASSERT(list_empty(&obd->obd_exports));
1594         spin_lock(&obd->obd_dev_lock);
1595         while (!list_empty(&obd->obd_unlinked_exports)) {
1596                 spin_unlock(&obd->obd_dev_lock);
1597                 set_current_state(TASK_UNINTERRUPTIBLE);
1598                 schedule_timeout(cfs_time_seconds(waited));
1599                 if (waited > 5 && IS_PO2(waited)) {
1600                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1601                                       "more than %d seconds. "
1602                                       "The obd refcount = %d. Is it stuck?\n",
1603                                       obd->obd_name, waited,
1604                                       atomic_read(&obd->obd_refcount));
1605                         dump_exports(obd, 1, D_CONSOLE | D_WARNING);
1606                 }
1607                 waited *= 2;
1608                 spin_lock(&obd->obd_dev_lock);
1609         }
1610         spin_unlock(&obd->obd_dev_lock);
1611 }
1612 EXPORT_SYMBOL(obd_exports_barrier);
1613
1614 /* Total amount of zombies to be destroyed */
1615 static int zombies_count = 0;
1616
1617 /**
1618  * kill zombie imports and exports
1619  */
1620 void obd_zombie_impexp_cull(void)
1621 {
1622         struct obd_import *import;
1623         struct obd_export *export;
1624         ENTRY;
1625
1626         do {
1627                 spin_lock(&obd_zombie_impexp_lock);
1628
1629                 import = NULL;
1630                 if (!list_empty(&obd_zombie_imports)) {
1631                         import = list_entry(obd_zombie_imports.next,
1632                                             struct obd_import,
1633                                             imp_zombie_chain);
1634                         list_del_init(&import->imp_zombie_chain);
1635                 }
1636
1637                 export = NULL;
1638                 if (!list_empty(&obd_zombie_exports)) {
1639                         export = list_entry(obd_zombie_exports.next,
1640                                             struct obd_export,
1641                                             exp_obd_chain);
1642                         list_del_init(&export->exp_obd_chain);
1643                 }
1644
1645                 spin_unlock(&obd_zombie_impexp_lock);
1646
1647                 if (import != NULL) {
1648                         class_import_destroy(import);
1649                         spin_lock(&obd_zombie_impexp_lock);
1650                         zombies_count--;
1651                         spin_unlock(&obd_zombie_impexp_lock);
1652                 }
1653
1654                 if (export != NULL) {
1655                         class_export_destroy(export);
1656                         spin_lock(&obd_zombie_impexp_lock);
1657                         zombies_count--;
1658                         spin_unlock(&obd_zombie_impexp_lock);
1659                 }
1660
1661                 cond_resched();
1662         } while (import != NULL || export != NULL);
1663         EXIT;
1664 }
1665
1666 static struct completion        obd_zombie_start;
1667 static struct completion        obd_zombie_stop;
1668 static unsigned long            obd_zombie_flags;
1669 static wait_queue_head_t        obd_zombie_waitq;
1670 static pid_t                    obd_zombie_pid;
1671
1672 enum {
1673         OBD_ZOMBIE_STOP         = 0x0001,
1674 };
1675
1676 /**
1677  * check for work for kill zombie import/export thread.
1678  */
1679 static int obd_zombie_impexp_check(void *arg)
1680 {
1681         int rc;
1682
1683         spin_lock(&obd_zombie_impexp_lock);
1684         rc = (zombies_count == 0) &&
1685              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1686         spin_unlock(&obd_zombie_impexp_lock);
1687
1688         RETURN(rc);
1689 }
1690
1691 /**
1692  * Add export to the obd_zombe thread and notify it.
1693  */
1694 static void obd_zombie_export_add(struct obd_export *exp) {
1695         atomic_dec(&obd_stale_export_num);
1696         spin_lock(&exp->exp_obd->obd_dev_lock);
1697         LASSERT(!list_empty(&exp->exp_obd_chain));
1698         list_del_init(&exp->exp_obd_chain);
1699         spin_unlock(&exp->exp_obd->obd_dev_lock);
1700         spin_lock(&obd_zombie_impexp_lock);
1701         zombies_count++;
1702         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1703         spin_unlock(&obd_zombie_impexp_lock);
1704
1705         obd_zombie_impexp_notify();
1706 }
1707
1708 /**
1709  * Add import to the obd_zombe thread and notify it.
1710  */
1711 static void obd_zombie_import_add(struct obd_import *imp) {
1712         LASSERT(imp->imp_sec == NULL);
1713         spin_lock(&obd_zombie_impexp_lock);
1714         LASSERT(list_empty(&imp->imp_zombie_chain));
1715         zombies_count++;
1716         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1717         spin_unlock(&obd_zombie_impexp_lock);
1718
1719         obd_zombie_impexp_notify();
1720 }
1721
1722 /**
1723  * notify import/export destroy thread about new zombie.
1724  */
1725 static void obd_zombie_impexp_notify(void)
1726 {
1727         /*
1728          * Make sure obd_zomebie_impexp_thread get this notification.
1729          * It is possible this signal only get by obd_zombie_barrier, and
1730          * barrier gulps this notification and sleeps away and hangs ensues
1731          */
1732         wake_up_all(&obd_zombie_waitq);
1733 }
1734
1735 /**
1736  * check whether obd_zombie is idle
1737  */
1738 static int obd_zombie_is_idle(void)
1739 {
1740         int rc;
1741
1742         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1743         spin_lock(&obd_zombie_impexp_lock);
1744         rc = (zombies_count == 0);
1745         spin_unlock(&obd_zombie_impexp_lock);
1746         return rc;
1747 }
1748
1749 /**
1750  * wait when obd_zombie import/export queues become empty
1751  */
1752 void obd_zombie_barrier(void)
1753 {
1754         struct l_wait_info lwi = { 0 };
1755
1756         if (obd_zombie_pid == current_pid())
1757                 /* don't wait for myself */
1758                 return;
1759         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1760 }
1761 EXPORT_SYMBOL(obd_zombie_barrier);
1762
1763
1764 struct obd_export *obd_stale_export_get(void)
1765 {
1766         struct obd_export *exp = NULL;
1767         ENTRY;
1768
1769         spin_lock(&obd_stale_export_lock);
1770         if (!list_empty(&obd_stale_exports)) {
1771                 exp = list_entry(obd_stale_exports.next,
1772                                  struct obd_export, exp_stale_list);
1773                 list_del_init(&exp->exp_stale_list);
1774         }
1775         spin_unlock(&obd_stale_export_lock);
1776
1777         if (exp) {
1778                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1779                        atomic_read(&obd_stale_export_num));
1780         }
1781         RETURN(exp);
1782 }
1783 EXPORT_SYMBOL(obd_stale_export_get);
1784
1785 void obd_stale_export_put(struct obd_export *exp)
1786 {
1787         ENTRY;
1788
1789         LASSERT(list_empty(&exp->exp_stale_list));
1790         if (exp->exp_lock_hash &&
1791             atomic_read(&exp->exp_lock_hash->hs_count)) {
1792                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1793                        atomic_read(&obd_stale_export_num));
1794
1795                 spin_lock_bh(&exp->exp_bl_list_lock);
1796                 spin_lock(&obd_stale_export_lock);
1797                 /* Add to the tail if there is no blocked locks,
1798                  * to the head otherwise. */
1799                 if (list_empty(&exp->exp_bl_list))
1800                         list_add_tail(&exp->exp_stale_list,
1801                                       &obd_stale_exports);
1802                 else
1803                         list_add(&exp->exp_stale_list,
1804                                  &obd_stale_exports);
1805
1806                 spin_unlock(&obd_stale_export_lock);
1807                 spin_unlock_bh(&exp->exp_bl_list_lock);
1808         } else {
1809                 class_export_put(exp);
1810         }
1811         EXIT;
1812 }
1813 EXPORT_SYMBOL(obd_stale_export_put);
1814
1815 /**
1816  * Adjust the position of the export in the stale list,
1817  * i.e. move to the head of the list if is needed.
1818  **/
1819 void obd_stale_export_adjust(struct obd_export *exp)
1820 {
1821         LASSERT(exp != NULL);
1822         spin_lock_bh(&exp->exp_bl_list_lock);
1823         spin_lock(&obd_stale_export_lock);
1824
1825         if (!list_empty(&exp->exp_stale_list) &&
1826             !list_empty(&exp->exp_bl_list))
1827                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1828
1829         spin_unlock(&obd_stale_export_lock);
1830         spin_unlock_bh(&exp->exp_bl_list_lock);
1831 }
1832 EXPORT_SYMBOL(obd_stale_export_adjust);
1833
1834 /**
1835  * destroy zombie export/import thread.
1836  */
1837 static int obd_zombie_impexp_thread(void *unused)
1838 {
1839         unshare_fs_struct();
1840         complete(&obd_zombie_start);
1841
1842         obd_zombie_pid = current_pid();
1843
1844         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1845                 struct l_wait_info lwi = { 0 };
1846
1847                 l_wait_event(obd_zombie_waitq,
1848                              !obd_zombie_impexp_check(NULL), &lwi);
1849                 obd_zombie_impexp_cull();
1850
1851                 /*
1852                  * Notify obd_zombie_barrier callers that queues
1853                  * may be empty.
1854                  */
1855                 wake_up(&obd_zombie_waitq);
1856         }
1857
1858         complete(&obd_zombie_stop);
1859
1860         RETURN(0);
1861 }
1862
1863
1864 /**
1865  * start destroy zombie import/export thread
1866  */
1867 int obd_zombie_impexp_init(void)
1868 {
1869         struct task_struct *task;
1870
1871         INIT_LIST_HEAD(&obd_zombie_imports);
1872
1873         INIT_LIST_HEAD(&obd_zombie_exports);
1874         spin_lock_init(&obd_zombie_impexp_lock);
1875         init_completion(&obd_zombie_start);
1876         init_completion(&obd_zombie_stop);
1877         init_waitqueue_head(&obd_zombie_waitq);
1878         obd_zombie_pid = 0;
1879
1880         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1881         if (IS_ERR(task))
1882                 RETURN(PTR_ERR(task));
1883
1884         wait_for_completion(&obd_zombie_start);
1885         RETURN(0);
1886 }
1887 /**
1888  * stop destroy zombie import/export thread
1889  */
1890 void obd_zombie_impexp_stop(void)
1891 {
1892         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1893         obd_zombie_impexp_notify();
1894         wait_for_completion(&obd_zombie_stop);
1895 }
1896
1897 /***** Kernel-userspace comm helpers *******/
1898
1899 /* Get length of entire message, including header */
1900 int kuc_len(int payload_len)
1901 {
1902         return sizeof(struct kuc_hdr) + payload_len;
1903 }
1904 EXPORT_SYMBOL(kuc_len);
1905
1906 /* Get a pointer to kuc header, given a ptr to the payload
1907  * @param p Pointer to payload area
1908  * @returns Pointer to kuc header
1909  */
1910 struct kuc_hdr * kuc_ptr(void *p)
1911 {
1912         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1913         LASSERT(lh->kuc_magic == KUC_MAGIC);
1914         return lh;
1915 }
1916 EXPORT_SYMBOL(kuc_ptr);
1917
1918 /* Alloc space for a message, and fill in header
1919  * @return Pointer to payload area
1920  */
1921 void *kuc_alloc(int payload_len, int transport, int type)
1922 {
1923         struct kuc_hdr *lh;
1924         int len = kuc_len(payload_len);
1925
1926         OBD_ALLOC(lh, len);
1927         if (lh == NULL)
1928                 return ERR_PTR(-ENOMEM);
1929
1930         lh->kuc_magic = KUC_MAGIC;
1931         lh->kuc_transport = transport;
1932         lh->kuc_msgtype = type;
1933         lh->kuc_msglen = len;
1934
1935         return (void *)(lh + 1);
1936 }
1937 EXPORT_SYMBOL(kuc_alloc);
1938
1939 /* Takes pointer to payload area */
1940 inline void kuc_free(void *p, int payload_len)
1941 {
1942         struct kuc_hdr *lh = kuc_ptr(p);
1943         OBD_FREE(lh, kuc_len(payload_len));
1944 }
1945 EXPORT_SYMBOL(kuc_free);
1946
1947 struct obd_request_slot_waiter {
1948         struct list_head        orsw_entry;
1949         wait_queue_head_t       orsw_waitq;
1950         bool                    orsw_signaled;
1951 };
1952
1953 static bool obd_request_slot_avail(struct client_obd *cli,
1954                                    struct obd_request_slot_waiter *orsw)
1955 {
1956         bool avail;
1957
1958         spin_lock(&cli->cl_loi_list_lock);
1959         avail = !!list_empty(&orsw->orsw_entry);
1960         spin_unlock(&cli->cl_loi_list_lock);
1961
1962         return avail;
1963 };
1964
1965 /*
1966  * For network flow control, the RPC sponsor needs to acquire a credit
1967  * before sending the RPC. The credits count for a connection is defined
1968  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1969  * the subsequent RPC sponsors need to wait until others released their
1970  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1971  */
1972 int obd_get_request_slot(struct client_obd *cli)
1973 {
1974         struct obd_request_slot_waiter   orsw;
1975         struct l_wait_info               lwi;
1976         int                              rc;
1977
1978         spin_lock(&cli->cl_loi_list_lock);
1979         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1980                 cli->cl_r_in_flight++;
1981                 spin_unlock(&cli->cl_loi_list_lock);
1982                 return 0;
1983         }
1984
1985         init_waitqueue_head(&orsw.orsw_waitq);
1986         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1987         orsw.orsw_signaled = false;
1988         spin_unlock(&cli->cl_loi_list_lock);
1989
1990         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1991         rc = l_wait_event(orsw.orsw_waitq,
1992                           obd_request_slot_avail(cli, &orsw) ||
1993                           orsw.orsw_signaled,
1994                           &lwi);
1995
1996         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1997          * freed but other (such as obd_put_request_slot) is using it. */
1998         spin_lock(&cli->cl_loi_list_lock);
1999         if (rc != 0) {
2000                 if (!orsw.orsw_signaled) {
2001                         if (list_empty(&orsw.orsw_entry))
2002                                 cli->cl_r_in_flight--;
2003                         else
2004                                 list_del(&orsw.orsw_entry);
2005                 }
2006         }
2007
2008         if (orsw.orsw_signaled) {
2009                 LASSERT(list_empty(&orsw.orsw_entry));
2010
2011                 rc = -EINTR;
2012         }
2013         spin_unlock(&cli->cl_loi_list_lock);
2014
2015         return rc;
2016 }
2017 EXPORT_SYMBOL(obd_get_request_slot);
2018
2019 void obd_put_request_slot(struct client_obd *cli)
2020 {
2021         struct obd_request_slot_waiter *orsw;
2022
2023         spin_lock(&cli->cl_loi_list_lock);
2024         cli->cl_r_in_flight--;
2025
2026         /* If there is free slot, wakeup the first waiter. */
2027         if (!list_empty(&cli->cl_loi_read_list) &&
2028             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2029                 orsw = list_entry(cli->cl_loi_read_list.next,
2030                                   struct obd_request_slot_waiter, orsw_entry);
2031                 list_del_init(&orsw->orsw_entry);
2032                 cli->cl_r_in_flight++;
2033                 wake_up(&orsw->orsw_waitq);
2034         }
2035         spin_unlock(&cli->cl_loi_list_lock);
2036 }
2037 EXPORT_SYMBOL(obd_put_request_slot);
2038
2039 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2040 {
2041         return cli->cl_max_rpcs_in_flight;
2042 }
2043 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2044
2045 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2046 {
2047         struct obd_request_slot_waiter *orsw;
2048         __u32                           old;
2049         int                             diff;
2050         int                             i;
2051         char                            *typ_name;
2052         int                             rc;
2053
2054         if (max > OBD_MAX_RIF_MAX || max < 1)
2055                 return -ERANGE;
2056
2057         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2058         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2059                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2060                  * strictly lower that max_rpcs_in_flight */
2061                 if (max < 2) {
2062                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2063                                "because it must be higher than "
2064                                "max_mod_rpcs_in_flight value",
2065                                cli->cl_import->imp_obd->obd_name);
2066                         return -ERANGE;
2067                 }
2068                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2069                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2070                         if (rc != 0)
2071                                 return rc;
2072                 }
2073         }
2074
2075         spin_lock(&cli->cl_loi_list_lock);
2076         old = cli->cl_max_rpcs_in_flight;
2077         cli->cl_max_rpcs_in_flight = max;
2078         diff = max - old;
2079
2080         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2081         for (i = 0; i < diff; i++) {
2082                 if (list_empty(&cli->cl_loi_read_list))
2083                         break;
2084
2085                 orsw = list_entry(cli->cl_loi_read_list.next,
2086                                   struct obd_request_slot_waiter, orsw_entry);
2087                 list_del_init(&orsw->orsw_entry);
2088                 cli->cl_r_in_flight++;
2089                 wake_up(&orsw->orsw_waitq);
2090         }
2091         spin_unlock(&cli->cl_loi_list_lock);
2092
2093         return 0;
2094 }
2095 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2096
2097 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2098 {
2099         return cli->cl_max_mod_rpcs_in_flight;
2100 }
2101 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2102
2103 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2104 {
2105         struct obd_connect_data *ocd;
2106         __u16 maxmodrpcs;
2107         __u16 prev;
2108
2109         if (max > OBD_MAX_RIF_MAX || max < 1)
2110                 return -ERANGE;
2111
2112         /* cannot exceed or equal max_rpcs_in_flight */
2113         if (max >= cli->cl_max_rpcs_in_flight) {
2114                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2115                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2116                        cli->cl_import->imp_obd->obd_name,
2117                        max, cli->cl_max_rpcs_in_flight);
2118                 return -ERANGE;
2119         }
2120
2121         /* cannot exceed max modify RPCs in flight supported by the server */
2122         ocd = &cli->cl_import->imp_connect_data;
2123         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2124                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2125         else
2126                 maxmodrpcs = 1;
2127         if (max > maxmodrpcs) {
2128                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2129                        "higher than max_mod_rpcs_per_client value (%hu) "
2130                        "returned by the server at connection\n",
2131                        cli->cl_import->imp_obd->obd_name,
2132                        max, maxmodrpcs);
2133                 return -ERANGE;
2134         }
2135
2136         spin_lock(&cli->cl_mod_rpcs_lock);
2137
2138         prev = cli->cl_max_mod_rpcs_in_flight;
2139         cli->cl_max_mod_rpcs_in_flight = max;
2140
2141         /* wakeup waiters if limit has been increased */
2142         if (cli->cl_max_mod_rpcs_in_flight > prev)
2143                 wake_up(&cli->cl_mod_rpcs_waitq);
2144
2145         spin_unlock(&cli->cl_mod_rpcs_lock);
2146
2147         return 0;
2148 }
2149 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2150
2151
2152 #define pct(a, b) (b ? a * 100 / b : 0)
2153 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2154                                struct seq_file *seq)
2155 {
2156         struct timeval now;
2157         unsigned long mod_tot = 0, mod_cum;
2158         int i;
2159
2160         do_gettimeofday(&now);
2161
2162         spin_lock(&cli->cl_mod_rpcs_lock);
2163
2164         seq_printf(seq, "snapshot_time:         %lu.%lu (secs.usecs)\n",
2165                    now.tv_sec, now.tv_usec);
2166         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2167                    cli->cl_mod_rpcs_in_flight);
2168
2169         seq_printf(seq, "\n\t\t\tmodify\n");
2170         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2171
2172         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2173
2174         mod_cum = 0;
2175         for (i = 0; i < OBD_HIST_MAX; i++) {
2176                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2177                 mod_cum += mod;
2178                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2179                            i, mod, pct(mod, mod_tot),
2180                            pct(mod_cum, mod_tot));
2181                 if (mod_cum == mod_tot)
2182                         break;
2183         }
2184
2185         spin_unlock(&cli->cl_mod_rpcs_lock);
2186
2187         return 0;
2188 }
2189 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2190 #undef pct
2191
2192
2193 /* The number of modify RPCs sent in parallel is limited
2194  * because the server has a finite number of slots per client to
2195  * store request result and ensure reply reconstruction when needed.
2196  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2197  * that takes into account server limit and cl_max_rpcs_in_flight
2198  * value.
2199  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2200  * one close request is allowed above the maximum.
2201  */
2202 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2203                                                  bool close_req)
2204 {
2205         bool avail;
2206
2207         /* A slot is available if
2208          * - number of modify RPCs in flight is less than the max
2209          * - it's a close RPC and no other close request is in flight
2210          */
2211         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2212                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2213
2214         return avail;
2215 }
2216
2217 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2218                                          bool close_req)
2219 {
2220         bool avail;
2221
2222         spin_lock(&cli->cl_mod_rpcs_lock);
2223         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2224         spin_unlock(&cli->cl_mod_rpcs_lock);
2225         return avail;
2226 }
2227
2228 /* Get a modify RPC slot from the obd client @cli according
2229  * to the kind of operation @opc that is going to be sent
2230  * and the intent @it of the operation if it applies.
2231  * If the maximum number of modify RPCs in flight is reached
2232  * the thread is put to sleep.
2233  * Returns the tag to be set in the request message. Tag 0
2234  * is reserved for non-modifying requests.
2235  */
2236 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2237                            struct lookup_intent *it)
2238 {
2239         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2240         bool                    close_req = false;
2241         __u16                   i, max;
2242
2243         /* read-only metadata RPCs don't consume a slot on MDT
2244          * for reply reconstruction
2245          */
2246         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2247                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2248                 return 0;
2249
2250         if (opc == MDS_CLOSE)
2251                 close_req = true;
2252
2253         do {
2254                 spin_lock(&cli->cl_mod_rpcs_lock);
2255                 max = cli->cl_max_mod_rpcs_in_flight;
2256                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2257                         /* there is a slot available */
2258                         cli->cl_mod_rpcs_in_flight++;
2259                         if (close_req)
2260                                 cli->cl_close_rpcs_in_flight++;
2261                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2262                                          cli->cl_mod_rpcs_in_flight);
2263                         /* find a free tag */
2264                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2265                                                 max + 1);
2266                         LASSERT(i < OBD_MAX_RIF_MAX);
2267                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2268                         spin_unlock(&cli->cl_mod_rpcs_lock);
2269                         /* tag 0 is reserved for non-modify RPCs */
2270                         return i + 1;
2271                 }
2272                 spin_unlock(&cli->cl_mod_rpcs_lock);
2273
2274                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2275                        "opc %u, max %hu\n",
2276                        cli->cl_import->imp_obd->obd_name, opc, max);
2277
2278                 l_wait_event(cli->cl_mod_rpcs_waitq,
2279                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2280         } while (true);
2281 }
2282 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2283
2284 /* Put a modify RPC slot from the obd client @cli according
2285  * to the kind of operation @opc that has been sent and the
2286  * intent @it of the operation if it applies.
2287  */
2288 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2289                           struct lookup_intent *it, __u16 tag)
2290 {
2291         bool                    close_req = false;
2292
2293         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2294                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2295                 return;
2296
2297         if (opc == MDS_CLOSE)
2298                 close_req = true;
2299
2300         spin_lock(&cli->cl_mod_rpcs_lock);
2301         cli->cl_mod_rpcs_in_flight--;
2302         if (close_req)
2303                 cli->cl_close_rpcs_in_flight--;
2304         /* release the tag in the bitmap */
2305         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2306         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2307         spin_unlock(&cli->cl_mod_rpcs_lock);
2308         wake_up(&cli->cl_mod_rpcs_waitq);
2309 }
2310 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2311