Whamcloud - gitweb
LU-6840 target: update reply data after update replay
[fs/lustre-release.git] / lustre / obdclass / genops.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 1999, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/obdclass/genops.c
37  *
38  * These are the only exported functions, they provide some generic
39  * infrastructure for managing object devices
40  */
41
42 #define DEBUG_SUBSYSTEM S_CLASS
43
44 #include <linux/kthread.h>
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_disk.h>
48 #include <lustre_kernelcomm.h>
49
50 spinlock_t obd_types_lock;
51
52 static struct kmem_cache *obd_device_cachep;
53 struct kmem_cache *obdo_cachep;
54 EXPORT_SYMBOL(obdo_cachep);
55 static struct kmem_cache *import_cachep;
56
57 static struct list_head obd_zombie_imports;
58 static struct list_head obd_zombie_exports;
59 static spinlock_t  obd_zombie_impexp_lock;
60
61 static void obd_zombie_impexp_notify(void);
62 static void obd_zombie_export_add(struct obd_export *exp);
63 static void obd_zombie_import_add(struct obd_import *imp);
64 static void print_export_data(struct obd_export *exp,
65                               const char *status, int locks);
66
67 struct list_head obd_stale_exports;
68 spinlock_t       obd_stale_export_lock;
69 atomic_t         obd_stale_export_num;
70
71 int (*ptlrpc_put_connection_superhack)(struct ptlrpc_connection *c);
72 EXPORT_SYMBOL(ptlrpc_put_connection_superhack);
73
74 /*
75  * support functions: we could use inter-module communication, but this
76  * is more portable to other OS's
77  */
78 static struct obd_device *obd_device_alloc(void)
79 {
80         struct obd_device *obd;
81
82         OBD_SLAB_ALLOC_PTR_GFP(obd, obd_device_cachep, GFP_NOFS);
83         if (obd != NULL) {
84                 obd->obd_magic = OBD_DEVICE_MAGIC;
85         }
86         return obd;
87 }
88
89 static void obd_device_free(struct obd_device *obd)
90 {
91         LASSERT(obd != NULL);
92         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "obd %p obd_magic %08x != %08x\n",
93                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
94         if (obd->obd_namespace != NULL) {
95                 CERROR("obd %p: namespace %p was not properly cleaned up (obd_force=%d)!\n",
96                        obd, obd->obd_namespace, obd->obd_force);
97                 LBUG();
98         }
99         lu_ref_fini(&obd->obd_reference);
100         OBD_SLAB_FREE_PTR(obd, obd_device_cachep);
101 }
102
103 struct obd_type *class_search_type(const char *name)
104 {
105         struct list_head *tmp;
106         struct obd_type *type;
107
108         spin_lock(&obd_types_lock);
109         list_for_each(tmp, &obd_types) {
110                 type = list_entry(tmp, struct obd_type, typ_chain);
111                 if (strcmp(type->typ_name, name) == 0) {
112                         spin_unlock(&obd_types_lock);
113                         return type;
114                 }
115         }
116         spin_unlock(&obd_types_lock);
117         return NULL;
118 }
119 EXPORT_SYMBOL(class_search_type);
120
121 struct obd_type *class_get_type(const char *name)
122 {
123         struct obd_type *type = class_search_type(name);
124
125 #ifdef HAVE_MODULE_LOADING_SUPPORT
126         if (!type) {
127                 const char *modname = name;
128
129                 if (strcmp(modname, "obdfilter") == 0)
130                         modname = "ofd";
131
132                 if (strcmp(modname, LUSTRE_LWP_NAME) == 0)
133                         modname = LUSTRE_OSP_NAME;
134
135                 if (!strncmp(modname, LUSTRE_MDS_NAME, strlen(LUSTRE_MDS_NAME)))
136                         modname = LUSTRE_MDT_NAME;
137
138                 if (!request_module("%s", modname)) {
139                         CDEBUG(D_INFO, "Loaded module '%s'\n", modname);
140                         type = class_search_type(name);
141                 } else {
142                         LCONSOLE_ERROR_MSG(0x158, "Can't load module '%s'\n",
143                                            modname);
144                 }
145         }
146 #endif
147         if (type) {
148                 spin_lock(&type->obd_type_lock);
149                 type->typ_refcnt++;
150                 try_module_get(type->typ_dt_ops->o_owner);
151                 spin_unlock(&type->obd_type_lock);
152         }
153         return type;
154 }
155
156 void class_put_type(struct obd_type *type)
157 {
158         LASSERT(type);
159         spin_lock(&type->obd_type_lock);
160         type->typ_refcnt--;
161         module_put(type->typ_dt_ops->o_owner);
162         spin_unlock(&type->obd_type_lock);
163 }
164
165 #define CLASS_MAX_NAME 1024
166
167 int class_register_type(struct obd_ops *dt_ops, struct md_ops *md_ops,
168                         bool enable_proc, struct lprocfs_vars *vars,
169                         const char *name, struct lu_device_type *ldt)
170 {
171         struct obd_type *type;
172         int rc = 0;
173         ENTRY;
174
175         /* sanity check */
176         LASSERT(strnlen(name, CLASS_MAX_NAME) < CLASS_MAX_NAME);
177
178         if (class_search_type(name)) {
179                 CDEBUG(D_IOCTL, "Type %s already registered\n", name);
180                 RETURN(-EEXIST);
181         }
182
183         rc = -ENOMEM;
184         OBD_ALLOC(type, sizeof(*type));
185         if (type == NULL)
186                 RETURN(rc);
187
188         OBD_ALLOC_PTR(type->typ_dt_ops);
189         OBD_ALLOC_PTR(type->typ_md_ops);
190         OBD_ALLOC(type->typ_name, strlen(name) + 1);
191
192         if (type->typ_dt_ops == NULL ||
193             type->typ_md_ops == NULL ||
194             type->typ_name == NULL)
195                 GOTO (failed, rc);
196
197         *(type->typ_dt_ops) = *dt_ops;
198         /* md_ops is optional */
199         if (md_ops)
200                 *(type->typ_md_ops) = *md_ops;
201         strcpy(type->typ_name, name);
202         spin_lock_init(&type->obd_type_lock);
203
204 #ifdef CONFIG_PROC_FS
205         if (enable_proc) {
206                 type->typ_procroot = lprocfs_register(type->typ_name,
207                                                       proc_lustre_root,
208                                                       vars, type);
209                 if (IS_ERR(type->typ_procroot)) {
210                         rc = PTR_ERR(type->typ_procroot);
211                         type->typ_procroot = NULL;
212                         GOTO(failed, rc);
213                 }
214         }
215 #endif
216         if (ldt != NULL) {
217                 type->typ_lu = ldt;
218                 rc = lu_device_type_init(ldt);
219                 if (rc != 0)
220                         GOTO (failed, rc);
221         }
222
223         spin_lock(&obd_types_lock);
224         list_add(&type->typ_chain, &obd_types);
225         spin_unlock(&obd_types_lock);
226
227         RETURN (0);
228
229 failed:
230         if (type->typ_name != NULL) {
231 #ifdef CONFIG_PROC_FS
232                 if (type->typ_procroot != NULL)
233                         remove_proc_subtree(type->typ_name, proc_lustre_root);
234 #endif
235                 OBD_FREE(type->typ_name, strlen(name) + 1);
236         }
237         if (type->typ_md_ops != NULL)
238                 OBD_FREE_PTR(type->typ_md_ops);
239         if (type->typ_dt_ops != NULL)
240                 OBD_FREE_PTR(type->typ_dt_ops);
241         OBD_FREE(type, sizeof(*type));
242         RETURN(rc);
243 }
244 EXPORT_SYMBOL(class_register_type);
245
246 int class_unregister_type(const char *name)
247 {
248         struct obd_type *type = class_search_type(name);
249         ENTRY;
250
251         if (!type) {
252                 CERROR("unknown obd type\n");
253                 RETURN(-EINVAL);
254         }
255
256         if (type->typ_refcnt) {
257                 CERROR("type %s has refcount (%d)\n", name, type->typ_refcnt);
258                 /* This is a bad situation, let's make the best of it */
259                 /* Remove ops, but leave the name for debugging */
260                 OBD_FREE_PTR(type->typ_dt_ops);
261                 OBD_FREE_PTR(type->typ_md_ops);
262                 RETURN(-EBUSY);
263         }
264
265         /* we do not use type->typ_procroot as for compatibility purposes
266          * other modules can share names (i.e. lod can use lov entry). so
267          * we can't reference pointer as it can get invalided when another
268          * module removes the entry */
269 #ifdef CONFIG_PROC_FS
270         if (type->typ_procroot != NULL)
271                 remove_proc_subtree(type->typ_name, proc_lustre_root);
272         if (type->typ_procsym != NULL)
273                 lprocfs_remove(&type->typ_procsym);
274 #endif
275         if (type->typ_lu)
276                 lu_device_type_fini(type->typ_lu);
277
278         spin_lock(&obd_types_lock);
279         list_del(&type->typ_chain);
280         spin_unlock(&obd_types_lock);
281         OBD_FREE(type->typ_name, strlen(name) + 1);
282         if (type->typ_dt_ops != NULL)
283                 OBD_FREE_PTR(type->typ_dt_ops);
284         if (type->typ_md_ops != NULL)
285                 OBD_FREE_PTR(type->typ_md_ops);
286         OBD_FREE(type, sizeof(*type));
287         RETURN(0);
288 } /* class_unregister_type */
289 EXPORT_SYMBOL(class_unregister_type);
290
291 /**
292  * Create a new obd device.
293  *
294  * Find an empty slot in ::obd_devs[], create a new obd device in it.
295  *
296  * \param[in] type_name obd device type string.
297  * \param[in] name      obd device name.
298  *
299  * \retval NULL if create fails, otherwise return the obd device
300  *         pointer created.
301  */
302 struct obd_device *class_newdev(const char *type_name, const char *name)
303 {
304         struct obd_device *result = NULL;
305         struct obd_device *newdev;
306         struct obd_type *type = NULL;
307         int i;
308         int new_obd_minor = 0;
309         ENTRY;
310
311         if (strlen(name) >= MAX_OBD_NAME) {
312                 CERROR("name/uuid must be < %u bytes long\n", MAX_OBD_NAME);
313                 RETURN(ERR_PTR(-EINVAL));
314         }
315
316         type = class_get_type(type_name);
317         if (type == NULL){
318                 CERROR("OBD: unknown type: %s\n", type_name);
319                 RETURN(ERR_PTR(-ENODEV));
320         }
321
322         newdev = obd_device_alloc();
323         if (newdev == NULL)
324                 GOTO(out_type, result = ERR_PTR(-ENOMEM));
325
326         LASSERT(newdev->obd_magic == OBD_DEVICE_MAGIC);
327
328         write_lock(&obd_dev_lock);
329         for (i = 0; i < class_devno_max(); i++) {
330                 struct obd_device *obd = class_num2obd(i);
331
332                 if (obd && (strcmp(name, obd->obd_name) == 0)) {
333                         CERROR("Device %s already exists at %d, won't add\n",
334                                name, i);
335                         if (result) {
336                                 LASSERTF(result->obd_magic == OBD_DEVICE_MAGIC,
337                                          "%p obd_magic %08x != %08x\n", result,
338                                          result->obd_magic, OBD_DEVICE_MAGIC);
339                                 LASSERTF(result->obd_minor == new_obd_minor,
340                                          "%p obd_minor %d != %d\n", result,
341                                          result->obd_minor, new_obd_minor);
342
343                                 obd_devs[result->obd_minor] = NULL;
344                                 result->obd_name[0]='\0';
345                          }
346                         result = ERR_PTR(-EEXIST);
347                         break;
348                 }
349                 if (!result && !obd) {
350                         result = newdev;
351                         result->obd_minor = i;
352                         new_obd_minor = i;
353                         result->obd_type = type;
354                         strncpy(result->obd_name, name,
355                                 sizeof(result->obd_name) - 1);
356                         obd_devs[i] = result;
357                 }
358         }
359         write_unlock(&obd_dev_lock);
360
361         if (result == NULL && i >= class_devno_max()) {
362                 CERROR("all %u OBD devices used, increase MAX_OBD_DEVICES\n",
363                        class_devno_max());
364                 GOTO(out, result = ERR_PTR(-EOVERFLOW));
365         }
366
367         if (IS_ERR(result))
368                 GOTO(out, result);
369
370         CDEBUG(D_IOCTL, "Adding new device %s (%p)\n",
371                result->obd_name, result);
372
373         RETURN(result);
374 out:
375         obd_device_free(newdev);
376 out_type:
377         class_put_type(type);
378         return result;
379 }
380
381 void class_release_dev(struct obd_device *obd)
382 {
383         struct obd_type *obd_type = obd->obd_type;
384
385         LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC, "%p obd_magic %08x != %08x\n",
386                  obd, obd->obd_magic, OBD_DEVICE_MAGIC);
387         LASSERTF(obd == obd_devs[obd->obd_minor], "obd %p != obd_devs[%d] %p\n",
388                  obd, obd->obd_minor, obd_devs[obd->obd_minor]);
389         LASSERT(obd_type != NULL);
390
391         CDEBUG(D_INFO, "Release obd device %s at %d obd_type name =%s\n",
392                obd->obd_name, obd->obd_minor, obd->obd_type->typ_name);
393
394         write_lock(&obd_dev_lock);
395         obd_devs[obd->obd_minor] = NULL;
396         write_unlock(&obd_dev_lock);
397         obd_device_free(obd);
398
399         class_put_type(obd_type);
400 }
401
402 int class_name2dev(const char *name)
403 {
404         int i;
405
406         if (!name)
407                 return -1;
408
409         read_lock(&obd_dev_lock);
410         for (i = 0; i < class_devno_max(); i++) {
411                 struct obd_device *obd = class_num2obd(i);
412
413                 if (obd && strcmp(name, obd->obd_name) == 0) {
414                         /* Make sure we finished attaching before we give
415                            out any references */
416                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
417                         if (obd->obd_attached) {
418                                 read_unlock(&obd_dev_lock);
419                                 return i;
420                         }
421                         break;
422                 }
423         }
424         read_unlock(&obd_dev_lock);
425
426         return -1;
427 }
428
429 struct obd_device *class_name2obd(const char *name)
430 {
431         int dev = class_name2dev(name);
432
433         if (dev < 0 || dev > class_devno_max())
434                 return NULL;
435         return class_num2obd(dev);
436 }
437 EXPORT_SYMBOL(class_name2obd);
438
439 int class_uuid2dev(struct obd_uuid *uuid)
440 {
441         int i;
442
443         read_lock(&obd_dev_lock);
444         for (i = 0; i < class_devno_max(); i++) {
445                 struct obd_device *obd = class_num2obd(i);
446
447                 if (obd && obd_uuid_equals(uuid, &obd->obd_uuid)) {
448                         LASSERT(obd->obd_magic == OBD_DEVICE_MAGIC);
449                         read_unlock(&obd_dev_lock);
450                         return i;
451                 }
452         }
453         read_unlock(&obd_dev_lock);
454
455         return -1;
456 }
457
458 struct obd_device *class_uuid2obd(struct obd_uuid *uuid)
459 {
460         int dev = class_uuid2dev(uuid);
461         if (dev < 0)
462                 return NULL;
463         return class_num2obd(dev);
464 }
465 EXPORT_SYMBOL(class_uuid2obd);
466
467 /**
468  * Get obd device from ::obd_devs[]
469  *
470  * \param num [in] array index
471  *
472  * \retval NULL if ::obd_devs[\a num] does not contains an obd device
473  *         otherwise return the obd device there.
474  */
475 struct obd_device *class_num2obd(int num)
476 {
477         struct obd_device *obd = NULL;
478
479         if (num < class_devno_max()) {
480                 obd = obd_devs[num];
481                 if (obd == NULL)
482                         return NULL;
483
484                 LASSERTF(obd->obd_magic == OBD_DEVICE_MAGIC,
485                          "%p obd_magic %08x != %08x\n",
486                          obd, obd->obd_magic, OBD_DEVICE_MAGIC);
487                 LASSERTF(obd->obd_minor == num,
488                          "%p obd_minor %0d != %0d\n",
489                          obd, obd->obd_minor, num);
490         }
491
492         return obd;
493 }
494
495 /**
496  * Get obd devices count. Device in any
497  *    state are counted
498  * \retval obd device count
499  */
500 int get_devices_count(void)
501 {
502         int index, max_index = class_devno_max(), dev_count = 0;
503
504         read_lock(&obd_dev_lock);
505         for (index = 0; index <= max_index; index++) {
506                 struct obd_device *obd = class_num2obd(index);
507                 if (obd != NULL)
508                         dev_count++;
509         }
510         read_unlock(&obd_dev_lock);
511
512         return dev_count;
513 }
514 EXPORT_SYMBOL(get_devices_count);
515
516 void class_obd_list(void)
517 {
518         char *status;
519         int i;
520
521         read_lock(&obd_dev_lock);
522         for (i = 0; i < class_devno_max(); i++) {
523                 struct obd_device *obd = class_num2obd(i);
524
525                 if (obd == NULL)
526                         continue;
527                 if (obd->obd_stopping)
528                         status = "ST";
529                 else if (obd->obd_set_up)
530                         status = "UP";
531                 else if (obd->obd_attached)
532                         status = "AT";
533                 else
534                         status = "--";
535                 LCONSOLE(D_CONFIG, "%3d %s %s %s %s %d\n",
536                          i, status, obd->obd_type->typ_name,
537                          obd->obd_name, obd->obd_uuid.uuid,
538                          atomic_read(&obd->obd_refcount));
539         }
540         read_unlock(&obd_dev_lock);
541         return;
542 }
543
544 /* Search for a client OBD connected to tgt_uuid.  If grp_uuid is
545    specified, then only the client with that uuid is returned,
546    otherwise any client connected to the tgt is returned. */
547 struct obd_device * class_find_client_obd(struct obd_uuid *tgt_uuid,
548                                           const char * typ_name,
549                                           struct obd_uuid *grp_uuid)
550 {
551         int i;
552
553         read_lock(&obd_dev_lock);
554         for (i = 0; i < class_devno_max(); i++) {
555                 struct obd_device *obd = class_num2obd(i);
556
557                 if (obd == NULL)
558                         continue;
559                 if ((strncmp(obd->obd_type->typ_name, typ_name,
560                              strlen(typ_name)) == 0)) {
561                         if (obd_uuid_equals(tgt_uuid,
562                                             &obd->u.cli.cl_target_uuid) &&
563                             ((grp_uuid)? obd_uuid_equals(grp_uuid,
564                                                          &obd->obd_uuid) : 1)) {
565                                 read_unlock(&obd_dev_lock);
566                                 return obd;
567                         }
568                 }
569         }
570         read_unlock(&obd_dev_lock);
571
572         return NULL;
573 }
574 EXPORT_SYMBOL(class_find_client_obd);
575
576 /* Iterate the obd_device list looking devices have grp_uuid. Start
577    searching at *next, and if a device is found, the next index to look
578    at is saved in *next. If next is NULL, then the first matching device
579    will always be returned. */
580 struct obd_device * class_devices_in_group(struct obd_uuid *grp_uuid, int *next)
581 {
582         int i;
583
584         if (next == NULL)
585                 i = 0;
586         else if (*next >= 0 && *next < class_devno_max())
587                 i = *next;
588         else
589                 return NULL;
590
591         read_lock(&obd_dev_lock);
592         for (; i < class_devno_max(); i++) {
593                 struct obd_device *obd = class_num2obd(i);
594
595                 if (obd == NULL)
596                         continue;
597                 if (obd_uuid_equals(grp_uuid, &obd->obd_uuid)) {
598                         if (next != NULL)
599                                 *next = i+1;
600                         read_unlock(&obd_dev_lock);
601                         return obd;
602                 }
603         }
604         read_unlock(&obd_dev_lock);
605
606         return NULL;
607 }
608 EXPORT_SYMBOL(class_devices_in_group);
609
610 /**
611  * to notify sptlrpc log for \a fsname has changed, let every relevant OBD
612  * adjust sptlrpc settings accordingly.
613  */
614 int class_notify_sptlrpc_conf(const char *fsname, int namelen)
615 {
616         struct obd_device  *obd;
617         const char         *type;
618         int                 i, rc = 0, rc2;
619
620         LASSERT(namelen > 0);
621
622         read_lock(&obd_dev_lock);
623         for (i = 0; i < class_devno_max(); i++) {
624                 obd = class_num2obd(i);
625
626                 if (obd == NULL || obd->obd_set_up == 0 || obd->obd_stopping)
627                         continue;
628
629                 /* only notify mdc, osc, mdt, ost */
630                 type = obd->obd_type->typ_name;
631                 if (strcmp(type, LUSTRE_MDC_NAME) != 0 &&
632                     strcmp(type, LUSTRE_OSC_NAME) != 0 &&
633                     strcmp(type, LUSTRE_MDT_NAME) != 0 &&
634                     strcmp(type, LUSTRE_OST_NAME) != 0)
635                         continue;
636
637                 if (strncmp(obd->obd_name, fsname, namelen))
638                         continue;
639
640                 class_incref(obd, __FUNCTION__, obd);
641                 read_unlock(&obd_dev_lock);
642                 rc2 = obd_set_info_async(NULL, obd->obd_self_export,
643                                          sizeof(KEY_SPTLRPC_CONF),
644                                          KEY_SPTLRPC_CONF, 0, NULL, NULL);
645                 rc = rc ? rc : rc2;
646                 class_decref(obd, __FUNCTION__, obd);
647                 read_lock(&obd_dev_lock);
648         }
649         read_unlock(&obd_dev_lock);
650         return rc;
651 }
652 EXPORT_SYMBOL(class_notify_sptlrpc_conf);
653
654 void obd_cleanup_caches(void)
655 {
656         ENTRY;
657         if (obd_device_cachep) {
658                 kmem_cache_destroy(obd_device_cachep);
659                 obd_device_cachep = NULL;
660         }
661         if (obdo_cachep) {
662                 kmem_cache_destroy(obdo_cachep);
663                 obdo_cachep = NULL;
664         }
665         if (import_cachep) {
666                 kmem_cache_destroy(import_cachep);
667                 import_cachep = NULL;
668         }
669
670         EXIT;
671 }
672
673 int obd_init_caches(void)
674 {
675         int rc;
676         ENTRY;
677
678         LASSERT(obd_device_cachep == NULL);
679         obd_device_cachep = kmem_cache_create("ll_obd_dev_cache",
680                                               sizeof(struct obd_device),
681                                               0, 0, NULL);
682         if (!obd_device_cachep)
683                 GOTO(out, rc = -ENOMEM);
684
685         LASSERT(obdo_cachep == NULL);
686         obdo_cachep = kmem_cache_create("ll_obdo_cache", sizeof(struct obdo),
687                                         0, 0, NULL);
688         if (!obdo_cachep)
689                 GOTO(out, rc = -ENOMEM);
690
691         LASSERT(import_cachep == NULL);
692         import_cachep = kmem_cache_create("ll_import_cache",
693                                           sizeof(struct obd_import),
694                                           0, 0, NULL);
695         if (!import_cachep)
696                 GOTO(out, rc = -ENOMEM);
697
698         RETURN(0);
699 out:
700         obd_cleanup_caches();
701         RETURN(rc);
702 }
703
704 /* map connection to client */
705 struct obd_export *class_conn2export(struct lustre_handle *conn)
706 {
707         struct obd_export *export;
708         ENTRY;
709
710         if (!conn) {
711                 CDEBUG(D_CACHE, "looking for null handle\n");
712                 RETURN(NULL);
713         }
714
715         if (conn->cookie == -1) {  /* this means assign a new connection */
716                 CDEBUG(D_CACHE, "want a new connection\n");
717                 RETURN(NULL);
718         }
719
720         CDEBUG(D_INFO, "looking for export cookie "LPX64"\n", conn->cookie);
721         export = class_handle2object(conn->cookie, NULL);
722         RETURN(export);
723 }
724 EXPORT_SYMBOL(class_conn2export);
725
726 struct obd_device *class_exp2obd(struct obd_export *exp)
727 {
728         if (exp)
729                 return exp->exp_obd;
730         return NULL;
731 }
732 EXPORT_SYMBOL(class_exp2obd);
733
734 struct obd_device *class_conn2obd(struct lustre_handle *conn)
735 {
736         struct obd_export *export;
737         export = class_conn2export(conn);
738         if (export) {
739                 struct obd_device *obd = export->exp_obd;
740                 class_export_put(export);
741                 return obd;
742         }
743         return NULL;
744 }
745
746 struct obd_import *class_exp2cliimp(struct obd_export *exp)
747 {
748         struct obd_device *obd = exp->exp_obd;
749         if (obd == NULL)
750                 return NULL;
751         return obd->u.cli.cl_import;
752 }
753 EXPORT_SYMBOL(class_exp2cliimp);
754
755 struct obd_import *class_conn2cliimp(struct lustre_handle *conn)
756 {
757         struct obd_device *obd = class_conn2obd(conn);
758         if (obd == NULL)
759                 return NULL;
760         return obd->u.cli.cl_import;
761 }
762
763 /* Export management functions */
764 static void class_export_destroy(struct obd_export *exp)
765 {
766         struct obd_device *obd = exp->exp_obd;
767         ENTRY;
768
769         LASSERT_ATOMIC_ZERO(&exp->exp_refcount);
770         LASSERT(obd != NULL);
771
772         CDEBUG(D_IOCTL, "destroying export %p/%s for %s\n", exp,
773                exp->exp_client_uuid.uuid, obd->obd_name);
774
775         /* "Local" exports (lctl, LOV->{mdc,osc}) have no connection. */
776         if (exp->exp_connection)
777                 ptlrpc_put_connection_superhack(exp->exp_connection);
778
779         LASSERT(list_empty(&exp->exp_outstanding_replies));
780         LASSERT(list_empty(&exp->exp_uncommitted_replies));
781         LASSERT(list_empty(&exp->exp_req_replay_queue));
782         LASSERT(list_empty(&exp->exp_hp_rpcs));
783         obd_destroy_export(exp);
784         class_decref(obd, "export", exp);
785
786         OBD_FREE_RCU(exp, sizeof(*exp), &exp->exp_handle);
787         EXIT;
788 }
789
790 static void export_handle_addref(void *export)
791 {
792         class_export_get(export);
793 }
794
795 static struct portals_handle_ops export_handle_ops = {
796         .hop_addref = export_handle_addref,
797         .hop_free   = NULL,
798 };
799
800 struct obd_export *class_export_get(struct obd_export *exp)
801 {
802         atomic_inc(&exp->exp_refcount);
803         CDEBUG(D_INFO, "GETting export %p : new refcount %d\n", exp,
804                atomic_read(&exp->exp_refcount));
805         return exp;
806 }
807 EXPORT_SYMBOL(class_export_get);
808
809 void class_export_put(struct obd_export *exp)
810 {
811         LASSERT(exp != NULL);
812         LASSERT_ATOMIC_GT_LT(&exp->exp_refcount, 0, LI_POISON);
813         CDEBUG(D_INFO, "PUTting export %p : new refcount %d\n", exp,
814                atomic_read(&exp->exp_refcount) - 1);
815
816         if (atomic_dec_and_test(&exp->exp_refcount)) {
817                 LASSERT(!list_empty(&exp->exp_obd_chain));
818                 LASSERT(list_empty(&exp->exp_stale_list));
819                 CDEBUG(D_IOCTL, "final put %p/%s\n",
820                        exp, exp->exp_client_uuid.uuid);
821
822                 /* release nid stat refererence */
823                 lprocfs_exp_cleanup(exp);
824
825                 obd_zombie_export_add(exp);
826         }
827 }
828 EXPORT_SYMBOL(class_export_put);
829
830 /* Creates a new export, adds it to the hash table, and returns a
831  * pointer to it. The refcount is 2: one for the hash reference, and
832  * one for the pointer returned by this function. */
833 struct obd_export *class_new_export(struct obd_device *obd,
834                                     struct obd_uuid *cluuid)
835 {
836         struct obd_export *export;
837         struct cfs_hash *hash = NULL;
838         int rc = 0;
839         ENTRY;
840
841         OBD_ALLOC_PTR(export);
842         if (!export)
843                 return ERR_PTR(-ENOMEM);
844
845         export->exp_conn_cnt = 0;
846         export->exp_lock_hash = NULL;
847         export->exp_flock_hash = NULL;
848         atomic_set(&export->exp_refcount, 2);
849         atomic_set(&export->exp_rpc_count, 0);
850         atomic_set(&export->exp_cb_count, 0);
851         atomic_set(&export->exp_locks_count, 0);
852 #if LUSTRE_TRACKS_LOCK_EXP_REFS
853         INIT_LIST_HEAD(&export->exp_locks_list);
854         spin_lock_init(&export->exp_locks_list_guard);
855 #endif
856         atomic_set(&export->exp_replay_count, 0);
857         export->exp_obd = obd;
858         INIT_LIST_HEAD(&export->exp_outstanding_replies);
859         spin_lock_init(&export->exp_uncommitted_replies_lock);
860         INIT_LIST_HEAD(&export->exp_uncommitted_replies);
861         INIT_LIST_HEAD(&export->exp_req_replay_queue);
862         INIT_LIST_HEAD(&export->exp_handle.h_link);
863         INIT_LIST_HEAD(&export->exp_hp_rpcs);
864         INIT_LIST_HEAD(&export->exp_reg_rpcs);
865         class_handle_hash(&export->exp_handle, &export_handle_ops);
866         export->exp_last_request_time = cfs_time_current_sec();
867         spin_lock_init(&export->exp_lock);
868         spin_lock_init(&export->exp_rpc_lock);
869         INIT_HLIST_NODE(&export->exp_uuid_hash);
870         INIT_HLIST_NODE(&export->exp_nid_hash);
871         INIT_HLIST_NODE(&export->exp_gen_hash);
872         spin_lock_init(&export->exp_bl_list_lock);
873         INIT_LIST_HEAD(&export->exp_bl_list);
874         INIT_LIST_HEAD(&export->exp_stale_list);
875
876         export->exp_sp_peer = LUSTRE_SP_ANY;
877         export->exp_flvr.sf_rpc = SPTLRPC_FLVR_INVALID;
878         export->exp_client_uuid = *cluuid;
879         obd_init_export(export);
880
881         spin_lock(&obd->obd_dev_lock);
882         /* shouldn't happen, but might race */
883         if (obd->obd_stopping)
884                 GOTO(exit_unlock, rc = -ENODEV);
885
886         hash = cfs_hash_getref(obd->obd_uuid_hash);
887         if (hash == NULL)
888                 GOTO(exit_unlock, rc = -ENODEV);
889         spin_unlock(&obd->obd_dev_lock);
890
891         if (!obd_uuid_equals(cluuid, &obd->obd_uuid)) {
892                 rc = cfs_hash_add_unique(hash, cluuid, &export->exp_uuid_hash);
893                 if (rc != 0) {
894                         LCONSOLE_WARN("%s: denying duplicate export for %s, %d\n",
895                                       obd->obd_name, cluuid->uuid, rc);
896                         GOTO(exit_err, rc = -EALREADY);
897                 }
898         }
899
900         at_init(&export->exp_bl_lock_at, obd_timeout, 0);
901         spin_lock(&obd->obd_dev_lock);
902         if (obd->obd_stopping) {
903                 cfs_hash_del(hash, cluuid, &export->exp_uuid_hash);
904                 GOTO(exit_unlock, rc = -ENODEV);
905         }
906
907         class_incref(obd, "export", export);
908         list_add(&export->exp_obd_chain, &export->exp_obd->obd_exports);
909         list_add_tail(&export->exp_obd_chain_timed,
910                       &export->exp_obd->obd_exports_timed);
911         export->exp_obd->obd_num_exports++;
912         spin_unlock(&obd->obd_dev_lock);
913         cfs_hash_putref(hash);
914         RETURN(export);
915
916 exit_unlock:
917         spin_unlock(&obd->obd_dev_lock);
918 exit_err:
919         if (hash)
920                 cfs_hash_putref(hash);
921         class_handle_unhash(&export->exp_handle);
922         LASSERT(hlist_unhashed(&export->exp_uuid_hash));
923         obd_destroy_export(export);
924         OBD_FREE_PTR(export);
925         return ERR_PTR(rc);
926 }
927 EXPORT_SYMBOL(class_new_export);
928
929 void class_unlink_export(struct obd_export *exp)
930 {
931         class_handle_unhash(&exp->exp_handle);
932
933         spin_lock(&exp->exp_obd->obd_dev_lock);
934         /* delete an uuid-export hashitem from hashtables */
935         if (!hlist_unhashed(&exp->exp_uuid_hash))
936                 cfs_hash_del(exp->exp_obd->obd_uuid_hash,
937                              &exp->exp_client_uuid,
938                              &exp->exp_uuid_hash);
939
940         if (!hlist_unhashed(&exp->exp_gen_hash)) {
941                 struct tg_export_data   *ted = &exp->exp_target_data;
942                 struct cfs_hash         *hash;
943
944                 hash = cfs_hash_getref(exp->exp_obd->obd_gen_hash);
945                 cfs_hash_del(hash, &ted->ted_lcd->lcd_generation,
946                              &exp->exp_gen_hash);
947                 cfs_hash_putref(hash);
948         }
949
950         list_move(&exp->exp_obd_chain, &exp->exp_obd->obd_unlinked_exports);
951         list_del_init(&exp->exp_obd_chain_timed);
952         exp->exp_obd->obd_num_exports--;
953         spin_unlock(&exp->exp_obd->obd_dev_lock);
954         atomic_inc(&obd_stale_export_num);
955
956         /* A reference is kept by obd_stale_exports list */
957         obd_stale_export_put(exp);
958 }
959
960 /* Import management functions */
961 static void class_import_destroy(struct obd_import *imp)
962 {
963         ENTRY;
964
965         CDEBUG(D_IOCTL, "destroying import %p for %s\n", imp,
966                 imp->imp_obd->obd_name);
967
968         LASSERT_ATOMIC_ZERO(&imp->imp_refcount);
969
970         ptlrpc_put_connection_superhack(imp->imp_connection);
971
972         while (!list_empty(&imp->imp_conn_list)) {
973                 struct obd_import_conn *imp_conn;
974
975                 imp_conn = list_entry(imp->imp_conn_list.next,
976                                       struct obd_import_conn, oic_item);
977                 list_del_init(&imp_conn->oic_item);
978                 ptlrpc_put_connection_superhack(imp_conn->oic_conn);
979                 OBD_FREE(imp_conn, sizeof(*imp_conn));
980         }
981
982         LASSERT(imp->imp_sec == NULL);
983         class_decref(imp->imp_obd, "import", imp);
984         OBD_FREE_RCU(imp, sizeof(*imp), &imp->imp_handle);
985         EXIT;
986 }
987
988 static void import_handle_addref(void *import)
989 {
990         class_import_get(import);
991 }
992
993 static struct portals_handle_ops import_handle_ops = {
994         .hop_addref = import_handle_addref,
995         .hop_free   = NULL,
996 };
997
998 struct obd_import *class_import_get(struct obd_import *import)
999 {
1000         atomic_inc(&import->imp_refcount);
1001         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", import,
1002                atomic_read(&import->imp_refcount),
1003                import->imp_obd->obd_name);
1004         return import;
1005 }
1006 EXPORT_SYMBOL(class_import_get);
1007
1008 void class_import_put(struct obd_import *imp)
1009 {
1010         ENTRY;
1011
1012         LASSERT(list_empty(&imp->imp_zombie_chain));
1013         LASSERT_ATOMIC_GT_LT(&imp->imp_refcount, 0, LI_POISON);
1014
1015         CDEBUG(D_INFO, "import %p refcount=%d obd=%s\n", imp,
1016                atomic_read(&imp->imp_refcount) - 1,
1017                imp->imp_obd->obd_name);
1018
1019         if (atomic_dec_and_test(&imp->imp_refcount)) {
1020                 CDEBUG(D_INFO, "final put import %p\n", imp);
1021                 obd_zombie_import_add(imp);
1022         }
1023
1024         /* catch possible import put race */
1025         LASSERT_ATOMIC_GE_LT(&imp->imp_refcount, 0, LI_POISON);
1026         EXIT;
1027 }
1028 EXPORT_SYMBOL(class_import_put);
1029
1030 static void init_imp_at(struct imp_at *at) {
1031         int i;
1032         at_init(&at->iat_net_latency, 0, 0);
1033         for (i = 0; i < IMP_AT_MAX_PORTALS; i++) {
1034                 /* max service estimates are tracked on the server side, so
1035                    don't use the AT history here, just use the last reported
1036                    val. (But keep hist for proc histogram, worst_ever) */
1037                 at_init(&at->iat_service_estimate[i], INITIAL_CONNECT_TIMEOUT,
1038                         AT_FLG_NOHIST);
1039         }
1040 }
1041
1042 struct obd_import *class_new_import(struct obd_device *obd)
1043 {
1044         struct obd_import *imp;
1045
1046         OBD_ALLOC(imp, sizeof(*imp));
1047         if (imp == NULL)
1048                 return NULL;
1049
1050         INIT_LIST_HEAD(&imp->imp_pinger_chain);
1051         INIT_LIST_HEAD(&imp->imp_zombie_chain);
1052         INIT_LIST_HEAD(&imp->imp_replay_list);
1053         INIT_LIST_HEAD(&imp->imp_sending_list);
1054         INIT_LIST_HEAD(&imp->imp_delayed_list);
1055         INIT_LIST_HEAD(&imp->imp_committed_list);
1056         imp->imp_replay_cursor = &imp->imp_committed_list;
1057         spin_lock_init(&imp->imp_lock);
1058         imp->imp_last_success_conn = 0;
1059         imp->imp_state = LUSTRE_IMP_NEW;
1060         imp->imp_obd = class_incref(obd, "import", imp);
1061         mutex_init(&imp->imp_sec_mutex);
1062         init_waitqueue_head(&imp->imp_recovery_waitq);
1063
1064         atomic_set(&imp->imp_refcount, 2);
1065         atomic_set(&imp->imp_unregistering, 0);
1066         atomic_set(&imp->imp_inflight, 0);
1067         atomic_set(&imp->imp_replay_inflight, 0);
1068         atomic_set(&imp->imp_inval_count, 0);
1069         INIT_LIST_HEAD(&imp->imp_conn_list);
1070         INIT_LIST_HEAD(&imp->imp_handle.h_link);
1071         class_handle_hash(&imp->imp_handle, &import_handle_ops);
1072         init_imp_at(&imp->imp_at);
1073
1074         /* the default magic is V2, will be used in connect RPC, and
1075          * then adjusted according to the flags in request/reply. */
1076         imp->imp_msg_magic = LUSTRE_MSG_MAGIC_V2;
1077
1078         return imp;
1079 }
1080 EXPORT_SYMBOL(class_new_import);
1081
1082 void class_destroy_import(struct obd_import *import)
1083 {
1084         LASSERT(import != NULL);
1085         LASSERT(import != LP_POISON);
1086
1087         class_handle_unhash(&import->imp_handle);
1088
1089         spin_lock(&import->imp_lock);
1090         import->imp_generation++;
1091         spin_unlock(&import->imp_lock);
1092         class_import_put(import);
1093 }
1094 EXPORT_SYMBOL(class_destroy_import);
1095
1096 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1097
1098 void __class_export_add_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1099 {
1100         spin_lock(&exp->exp_locks_list_guard);
1101
1102         LASSERT(lock->l_exp_refs_nr >= 0);
1103
1104         if (lock->l_exp_refs_target != NULL &&
1105             lock->l_exp_refs_target != exp) {
1106                 LCONSOLE_WARN("setting export %p for lock %p which already has export %p\n",
1107                               exp, lock, lock->l_exp_refs_target);
1108         }
1109         if ((lock->l_exp_refs_nr ++) == 0) {
1110                 list_add(&lock->l_exp_refs_link, &exp->exp_locks_list);
1111                 lock->l_exp_refs_target = exp;
1112         }
1113         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1114                lock, exp, lock->l_exp_refs_nr);
1115         spin_unlock(&exp->exp_locks_list_guard);
1116 }
1117
1118 void __class_export_del_lock_ref(struct obd_export *exp, struct ldlm_lock *lock)
1119 {
1120         spin_lock(&exp->exp_locks_list_guard);
1121         LASSERT(lock->l_exp_refs_nr > 0);
1122         if (lock->l_exp_refs_target != exp) {
1123                 LCONSOLE_WARN("lock %p, "
1124                               "mismatching export pointers: %p, %p\n",
1125                               lock, lock->l_exp_refs_target, exp);
1126         }
1127         if (-- lock->l_exp_refs_nr == 0) {
1128                 list_del_init(&lock->l_exp_refs_link);
1129                 lock->l_exp_refs_target = NULL;
1130         }
1131         CDEBUG(D_INFO, "lock = %p, export = %p, refs = %u\n",
1132                lock, exp, lock->l_exp_refs_nr);
1133         spin_unlock(&exp->exp_locks_list_guard);
1134 }
1135 #endif
1136
1137 /* A connection defines an export context in which preallocation can
1138    be managed. This releases the export pointer reference, and returns
1139    the export handle, so the export refcount is 1 when this function
1140    returns. */
1141 int class_connect(struct lustre_handle *conn, struct obd_device *obd,
1142                   struct obd_uuid *cluuid)
1143 {
1144         struct obd_export *export;
1145         LASSERT(conn != NULL);
1146         LASSERT(obd != NULL);
1147         LASSERT(cluuid != NULL);
1148         ENTRY;
1149
1150         export = class_new_export(obd, cluuid);
1151         if (IS_ERR(export))
1152                 RETURN(PTR_ERR(export));
1153
1154         conn->cookie = export->exp_handle.h_cookie;
1155         class_export_put(export);
1156
1157         CDEBUG(D_IOCTL, "connect: client %s, cookie "LPX64"\n",
1158                cluuid->uuid, conn->cookie);
1159         RETURN(0);
1160 }
1161 EXPORT_SYMBOL(class_connect);
1162
1163 /* if export is involved in recovery then clean up related things */
1164 static void class_export_recovery_cleanup(struct obd_export *exp)
1165 {
1166         struct obd_device *obd = exp->exp_obd;
1167
1168         spin_lock(&obd->obd_recovery_task_lock);
1169         if (obd->obd_recovering) {
1170                 if (exp->exp_in_recovery) {
1171                         spin_lock(&exp->exp_lock);
1172                         exp->exp_in_recovery = 0;
1173                         spin_unlock(&exp->exp_lock);
1174                         LASSERT_ATOMIC_POS(&obd->obd_connected_clients);
1175                         atomic_dec(&obd->obd_connected_clients);
1176                 }
1177
1178                 /* if called during recovery then should update
1179                  * obd_stale_clients counter,
1180                  * lightweight exports are not counted */
1181                 if ((exp_connect_flags(exp) & OBD_CONNECT_LIGHTWEIGHT) == 0)
1182                         exp->exp_obd->obd_stale_clients++;
1183         }
1184         spin_unlock(&obd->obd_recovery_task_lock);
1185
1186         spin_lock(&exp->exp_lock);
1187         /** Cleanup req replay fields */
1188         if (exp->exp_req_replay_needed) {
1189                 exp->exp_req_replay_needed = 0;
1190
1191                 LASSERT(atomic_read(&obd->obd_req_replay_clients));
1192                 atomic_dec(&obd->obd_req_replay_clients);
1193         }
1194
1195         /** Cleanup lock replay data */
1196         if (exp->exp_lock_replay_needed) {
1197                 exp->exp_lock_replay_needed = 0;
1198
1199                 LASSERT(atomic_read(&obd->obd_lock_replay_clients));
1200                 atomic_dec(&obd->obd_lock_replay_clients);
1201         }
1202         spin_unlock(&exp->exp_lock);
1203 }
1204
1205 /* This function removes 1-3 references from the export:
1206  * 1 - for export pointer passed
1207  * and if disconnect really need
1208  * 2 - removing from hash
1209  * 3 - in client_unlink_export
1210  * The export pointer passed to this function can destroyed */
1211 int class_disconnect(struct obd_export *export)
1212 {
1213         int already_disconnected;
1214         ENTRY;
1215
1216         if (export == NULL) {
1217                 CWARN("attempting to free NULL export %p\n", export);
1218                 RETURN(-EINVAL);
1219         }
1220
1221         spin_lock(&export->exp_lock);
1222         already_disconnected = export->exp_disconnected;
1223         export->exp_disconnected = 1;
1224         spin_unlock(&export->exp_lock);
1225
1226         /* class_cleanup(), abort_recovery(), and class_fail_export()
1227          * all end up in here, and if any of them race we shouldn't
1228          * call extra class_export_puts(). */
1229         if (already_disconnected) {
1230                 LASSERT(hlist_unhashed(&export->exp_nid_hash));
1231                 GOTO(no_disconn, already_disconnected);
1232         }
1233
1234         CDEBUG(D_IOCTL, "disconnect: cookie "LPX64"\n",
1235                export->exp_handle.h_cookie);
1236
1237         if (!hlist_unhashed(&export->exp_nid_hash))
1238                 cfs_hash_del(export->exp_obd->obd_nid_hash,
1239                              &export->exp_connection->c_peer.nid,
1240                              &export->exp_nid_hash);
1241
1242         class_export_recovery_cleanup(export);
1243         class_unlink_export(export);
1244 no_disconn:
1245         class_export_put(export);
1246         RETURN(0);
1247 }
1248 EXPORT_SYMBOL(class_disconnect);
1249
1250 /* Return non-zero for a fully connected export */
1251 int class_connected_export(struct obd_export *exp)
1252 {
1253         int connected = 0;
1254
1255         if (exp) {
1256                 spin_lock(&exp->exp_lock);
1257                 connected = (exp->exp_conn_cnt > 0) && !exp->exp_failed;
1258                 spin_unlock(&exp->exp_lock);
1259         }
1260         return connected;
1261 }
1262 EXPORT_SYMBOL(class_connected_export);
1263
1264 static void class_disconnect_export_list(struct list_head *list,
1265                                          enum obd_option flags)
1266 {
1267         int rc;
1268         struct obd_export *exp;
1269         ENTRY;
1270
1271         /* It's possible that an export may disconnect itself, but
1272          * nothing else will be added to this list. */
1273         while (!list_empty(list)) {
1274                 exp = list_entry(list->next, struct obd_export,
1275                                  exp_obd_chain);
1276                 /* need for safe call CDEBUG after obd_disconnect */
1277                 class_export_get(exp);
1278
1279                 spin_lock(&exp->exp_lock);
1280                 exp->exp_flags = flags;
1281                 spin_unlock(&exp->exp_lock);
1282
1283                 if (obd_uuid_equals(&exp->exp_client_uuid,
1284                                     &exp->exp_obd->obd_uuid)) {
1285                         CDEBUG(D_HA,
1286                                "exp %p export uuid == obd uuid, don't discon\n",
1287                                exp);
1288                         /* Need to delete this now so we don't end up pointing
1289                          * to work_list later when this export is cleaned up. */
1290                         list_del_init(&exp->exp_obd_chain);
1291                         class_export_put(exp);
1292                         continue;
1293                 }
1294
1295                 class_export_get(exp);
1296                 CDEBUG(D_HA, "%s: disconnecting export at %s (%p), "
1297                        "last request at "CFS_TIME_T"\n",
1298                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
1299                        exp, exp->exp_last_request_time);
1300                 /* release one export reference anyway */
1301                 rc = obd_disconnect(exp);
1302
1303                 CDEBUG(D_HA, "disconnected export at %s (%p): rc %d\n",
1304                        obd_export_nid2str(exp), exp, rc);
1305                 class_export_put(exp);
1306         }
1307         EXIT;
1308 }
1309
1310 void class_disconnect_exports(struct obd_device *obd)
1311 {
1312         struct list_head work_list;
1313         ENTRY;
1314
1315         /* Move all of the exports from obd_exports to a work list, en masse. */
1316         INIT_LIST_HEAD(&work_list);
1317         spin_lock(&obd->obd_dev_lock);
1318         list_splice_init(&obd->obd_exports, &work_list);
1319         list_splice_init(&obd->obd_delayed_exports, &work_list);
1320         spin_unlock(&obd->obd_dev_lock);
1321
1322         if (!list_empty(&work_list)) {
1323                 CDEBUG(D_HA, "OBD device %d (%p) has exports, "
1324                        "disconnecting them\n", obd->obd_minor, obd);
1325                 class_disconnect_export_list(&work_list,
1326                                              exp_flags_from_obd(obd));
1327         } else
1328                 CDEBUG(D_HA, "OBD device %d (%p) has no exports\n",
1329                        obd->obd_minor, obd);
1330         EXIT;
1331 }
1332 EXPORT_SYMBOL(class_disconnect_exports);
1333
1334 /* Remove exports that have not completed recovery.
1335  */
1336 void class_disconnect_stale_exports(struct obd_device *obd,
1337                                     int (*test_export)(struct obd_export *))
1338 {
1339         struct list_head work_list;
1340         struct obd_export *exp, *n;
1341         int evicted = 0;
1342         ENTRY;
1343
1344         INIT_LIST_HEAD(&work_list);
1345         spin_lock(&obd->obd_dev_lock);
1346         list_for_each_entry_safe(exp, n, &obd->obd_exports,
1347                                  exp_obd_chain) {
1348                 /* don't count self-export as client */
1349                 if (obd_uuid_equals(&exp->exp_client_uuid,
1350                                     &exp->exp_obd->obd_uuid))
1351                         continue;
1352
1353                 /* don't evict clients which have no slot in last_rcvd
1354                  * (e.g. lightweight connection) */
1355                 if (exp->exp_target_data.ted_lr_idx == -1)
1356                         continue;
1357
1358                 spin_lock(&exp->exp_lock);
1359                 if (exp->exp_failed || test_export(exp)) {
1360                         spin_unlock(&exp->exp_lock);
1361                         continue;
1362                 }
1363                 exp->exp_failed = 1;
1364                 spin_unlock(&exp->exp_lock);
1365
1366                 list_move(&exp->exp_obd_chain, &work_list);
1367                 evicted++;
1368                 CDEBUG(D_HA, "%s: disconnect stale client %s@%s\n",
1369                        obd->obd_name, exp->exp_client_uuid.uuid,
1370                        exp->exp_connection == NULL ? "<unknown>" :
1371                        libcfs_nid2str(exp->exp_connection->c_peer.nid));
1372                 print_export_data(exp, "EVICTING", 0);
1373         }
1374         spin_unlock(&obd->obd_dev_lock);
1375
1376         if (evicted)
1377                 LCONSOLE_WARN("%s: disconnecting %d stale clients\n",
1378                               obd->obd_name, evicted);
1379
1380         class_disconnect_export_list(&work_list, exp_flags_from_obd(obd) |
1381                                                  OBD_OPT_ABORT_RECOV);
1382         EXIT;
1383 }
1384 EXPORT_SYMBOL(class_disconnect_stale_exports);
1385
1386 void class_fail_export(struct obd_export *exp)
1387 {
1388         int rc, already_failed;
1389
1390         spin_lock(&exp->exp_lock);
1391         already_failed = exp->exp_failed;
1392         exp->exp_failed = 1;
1393         spin_unlock(&exp->exp_lock);
1394
1395         if (already_failed) {
1396                 CDEBUG(D_HA, "disconnecting dead export %p/%s; skipping\n",
1397                        exp, exp->exp_client_uuid.uuid);
1398                 return;
1399         }
1400
1401         CDEBUG(D_HA, "disconnecting export %p/%s\n",
1402                exp, exp->exp_client_uuid.uuid);
1403
1404         if (obd_dump_on_timeout)
1405                 libcfs_debug_dumplog();
1406
1407         /* need for safe call CDEBUG after obd_disconnect */
1408         class_export_get(exp);
1409
1410         /* Most callers into obd_disconnect are removing their own reference
1411          * (request, for example) in addition to the one from the hash table.
1412          * We don't have such a reference here, so make one. */
1413         class_export_get(exp);
1414         rc = obd_disconnect(exp);
1415         if (rc)
1416                 CERROR("disconnecting export %p failed: %d\n", exp, rc);
1417         else
1418                 CDEBUG(D_HA, "disconnected export %p/%s\n",
1419                        exp, exp->exp_client_uuid.uuid);
1420         class_export_put(exp);
1421 }
1422 EXPORT_SYMBOL(class_fail_export);
1423
1424 char *obd_export_nid2str(struct obd_export *exp)
1425 {
1426         if (exp->exp_connection != NULL)
1427                 return libcfs_nid2str(exp->exp_connection->c_peer.nid);
1428
1429         return "(no nid)";
1430 }
1431 EXPORT_SYMBOL(obd_export_nid2str);
1432
1433 int obd_export_evict_by_nid(struct obd_device *obd, const char *nid)
1434 {
1435         struct cfs_hash *nid_hash;
1436         struct obd_export *doomed_exp = NULL;
1437         int exports_evicted = 0;
1438
1439         lnet_nid_t nid_key = libcfs_str2nid((char *)nid);
1440
1441         spin_lock(&obd->obd_dev_lock);
1442         /* umount has run already, so evict thread should leave
1443          * its task to umount thread now */
1444         if (obd->obd_stopping) {
1445                 spin_unlock(&obd->obd_dev_lock);
1446                 return exports_evicted;
1447         }
1448         nid_hash = obd->obd_nid_hash;
1449         cfs_hash_getref(nid_hash);
1450         spin_unlock(&obd->obd_dev_lock);
1451
1452         do {
1453                 doomed_exp = cfs_hash_lookup(nid_hash, &nid_key);
1454                 if (doomed_exp == NULL)
1455                         break;
1456
1457                 LASSERTF(doomed_exp->exp_connection->c_peer.nid == nid_key,
1458                          "nid %s found, wanted nid %s, requested nid %s\n",
1459                          obd_export_nid2str(doomed_exp),
1460                          libcfs_nid2str(nid_key), nid);
1461                 LASSERTF(doomed_exp != obd->obd_self_export,
1462                          "self-export is hashed by NID?\n");
1463                 exports_evicted++;
1464                 LCONSOLE_WARN("%s: evicting %s (at %s) by administrative "
1465                               "request\n", obd->obd_name,
1466                               obd_uuid2str(&doomed_exp->exp_client_uuid),
1467                               obd_export_nid2str(doomed_exp));
1468                 class_fail_export(doomed_exp);
1469                 class_export_put(doomed_exp);
1470         } while (1);
1471
1472         cfs_hash_putref(nid_hash);
1473
1474         if (!exports_evicted)
1475                 CDEBUG(D_HA,"%s: can't disconnect NID '%s': no exports found\n",
1476                        obd->obd_name, nid);
1477         return exports_evicted;
1478 }
1479 EXPORT_SYMBOL(obd_export_evict_by_nid);
1480
1481 int obd_export_evict_by_uuid(struct obd_device *obd, const char *uuid)
1482 {
1483         struct cfs_hash *uuid_hash;
1484         struct obd_export *doomed_exp = NULL;
1485         struct obd_uuid doomed_uuid;
1486         int exports_evicted = 0;
1487
1488         spin_lock(&obd->obd_dev_lock);
1489         if (obd->obd_stopping) {
1490                 spin_unlock(&obd->obd_dev_lock);
1491                 return exports_evicted;
1492         }
1493         uuid_hash = obd->obd_uuid_hash;
1494         cfs_hash_getref(uuid_hash);
1495         spin_unlock(&obd->obd_dev_lock);
1496
1497         obd_str2uuid(&doomed_uuid, uuid);
1498         if (obd_uuid_equals(&doomed_uuid, &obd->obd_uuid)) {
1499                 CERROR("%s: can't evict myself\n", obd->obd_name);
1500                 cfs_hash_putref(uuid_hash);
1501                 return exports_evicted;
1502         }
1503
1504         doomed_exp = cfs_hash_lookup(uuid_hash, &doomed_uuid);
1505
1506         if (doomed_exp == NULL) {
1507                 CERROR("%s: can't disconnect %s: no exports found\n",
1508                        obd->obd_name, uuid);
1509         } else {
1510                 CWARN("%s: evicting %s at adminstrative request\n",
1511                        obd->obd_name, doomed_exp->exp_client_uuid.uuid);
1512                 class_fail_export(doomed_exp);
1513                 class_export_put(doomed_exp);
1514                 exports_evicted++;
1515         }
1516         cfs_hash_putref(uuid_hash);
1517
1518         return exports_evicted;
1519 }
1520
1521 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1522 void (*class_export_dump_hook)(struct obd_export*) = NULL;
1523 #endif
1524
1525 static void print_export_data(struct obd_export *exp, const char *status,
1526                               int locks)
1527 {
1528         struct ptlrpc_reply_state *rs;
1529         struct ptlrpc_reply_state *first_reply = NULL;
1530         int nreplies = 0;
1531
1532         spin_lock(&exp->exp_lock);
1533         list_for_each_entry(rs, &exp->exp_outstanding_replies,
1534                             rs_exp_list) {
1535                 if (nreplies == 0)
1536                         first_reply = rs;
1537                 nreplies++;
1538         }
1539         spin_unlock(&exp->exp_lock);
1540
1541         CDEBUG(D_HA, "%s: %s %p %s %s %d (%d %d %d) %d %d %d %d: %p %s "LPU64"\n",
1542                exp->exp_obd->obd_name, status, exp, exp->exp_client_uuid.uuid,
1543                obd_export_nid2str(exp), atomic_read(&exp->exp_refcount),
1544                atomic_read(&exp->exp_rpc_count),
1545                atomic_read(&exp->exp_cb_count),
1546                atomic_read(&exp->exp_locks_count),
1547                exp->exp_disconnected, exp->exp_delayed, exp->exp_failed,
1548                nreplies, first_reply, nreplies > 3 ? "..." : "",
1549                exp->exp_last_committed);
1550 #if LUSTRE_TRACKS_LOCK_EXP_REFS
1551         if (locks && class_export_dump_hook != NULL)
1552                 class_export_dump_hook(exp);
1553 #endif
1554 }
1555
1556 void dump_exports(struct obd_device *obd, int locks)
1557 {
1558         struct obd_export *exp;
1559
1560         spin_lock(&obd->obd_dev_lock);
1561         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain)
1562                 print_export_data(exp, "ACTIVE", locks);
1563         list_for_each_entry(exp, &obd->obd_unlinked_exports, exp_obd_chain)
1564                 print_export_data(exp, "UNLINKED", locks);
1565         list_for_each_entry(exp, &obd->obd_delayed_exports, exp_obd_chain)
1566                 print_export_data(exp, "DELAYED", locks);
1567         spin_unlock(&obd->obd_dev_lock);
1568         spin_lock(&obd_zombie_impexp_lock);
1569         list_for_each_entry(exp, &obd_zombie_exports, exp_obd_chain)
1570                 print_export_data(exp, "ZOMBIE", locks);
1571         spin_unlock(&obd_zombie_impexp_lock);
1572 }
1573
1574 void obd_exports_barrier(struct obd_device *obd)
1575 {
1576         int waited = 2;
1577         LASSERT(list_empty(&obd->obd_exports));
1578         spin_lock(&obd->obd_dev_lock);
1579         while (!list_empty(&obd->obd_unlinked_exports)) {
1580                 spin_unlock(&obd->obd_dev_lock);
1581                 set_current_state(TASK_UNINTERRUPTIBLE);
1582                 schedule_timeout(cfs_time_seconds(waited));
1583                 if (waited > 5 && IS_PO2(waited)) {
1584                         LCONSOLE_WARN("%s is waiting for obd_unlinked_exports "
1585                                       "more than %d seconds. "
1586                                       "The obd refcount = %d. Is it stuck?\n",
1587                                       obd->obd_name, waited,
1588                                       atomic_read(&obd->obd_refcount));
1589                         dump_exports(obd, 1);
1590                 }
1591                 waited *= 2;
1592                 spin_lock(&obd->obd_dev_lock);
1593         }
1594         spin_unlock(&obd->obd_dev_lock);
1595 }
1596 EXPORT_SYMBOL(obd_exports_barrier);
1597
1598 /* Total amount of zombies to be destroyed */
1599 static int zombies_count = 0;
1600
1601 /**
1602  * kill zombie imports and exports
1603  */
1604 void obd_zombie_impexp_cull(void)
1605 {
1606         struct obd_import *import;
1607         struct obd_export *export;
1608         ENTRY;
1609
1610         do {
1611                 spin_lock(&obd_zombie_impexp_lock);
1612
1613                 import = NULL;
1614                 if (!list_empty(&obd_zombie_imports)) {
1615                         import = list_entry(obd_zombie_imports.next,
1616                                             struct obd_import,
1617                                             imp_zombie_chain);
1618                         list_del_init(&import->imp_zombie_chain);
1619                 }
1620
1621                 export = NULL;
1622                 if (!list_empty(&obd_zombie_exports)) {
1623                         export = list_entry(obd_zombie_exports.next,
1624                                             struct obd_export,
1625                                             exp_obd_chain);
1626                         list_del_init(&export->exp_obd_chain);
1627                 }
1628
1629                 spin_unlock(&obd_zombie_impexp_lock);
1630
1631                 if (import != NULL) {
1632                         class_import_destroy(import);
1633                         spin_lock(&obd_zombie_impexp_lock);
1634                         zombies_count--;
1635                         spin_unlock(&obd_zombie_impexp_lock);
1636                 }
1637
1638                 if (export != NULL) {
1639                         class_export_destroy(export);
1640                         spin_lock(&obd_zombie_impexp_lock);
1641                         zombies_count--;
1642                         spin_unlock(&obd_zombie_impexp_lock);
1643                 }
1644
1645                 cond_resched();
1646         } while (import != NULL || export != NULL);
1647         EXIT;
1648 }
1649
1650 static struct completion        obd_zombie_start;
1651 static struct completion        obd_zombie_stop;
1652 static unsigned long            obd_zombie_flags;
1653 static wait_queue_head_t        obd_zombie_waitq;
1654 static pid_t                    obd_zombie_pid;
1655
1656 enum {
1657         OBD_ZOMBIE_STOP         = 0x0001,
1658 };
1659
1660 /**
1661  * check for work for kill zombie import/export thread.
1662  */
1663 static int obd_zombie_impexp_check(void *arg)
1664 {
1665         int rc;
1666
1667         spin_lock(&obd_zombie_impexp_lock);
1668         rc = (zombies_count == 0) &&
1669              !test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1670         spin_unlock(&obd_zombie_impexp_lock);
1671
1672         RETURN(rc);
1673 }
1674
1675 /**
1676  * Add export to the obd_zombe thread and notify it.
1677  */
1678 static void obd_zombie_export_add(struct obd_export *exp) {
1679         atomic_dec(&obd_stale_export_num);
1680         spin_lock(&exp->exp_obd->obd_dev_lock);
1681         LASSERT(!list_empty(&exp->exp_obd_chain));
1682         list_del_init(&exp->exp_obd_chain);
1683         spin_unlock(&exp->exp_obd->obd_dev_lock);
1684         spin_lock(&obd_zombie_impexp_lock);
1685         zombies_count++;
1686         list_add(&exp->exp_obd_chain, &obd_zombie_exports);
1687         spin_unlock(&obd_zombie_impexp_lock);
1688
1689         obd_zombie_impexp_notify();
1690 }
1691
1692 /**
1693  * Add import to the obd_zombe thread and notify it.
1694  */
1695 static void obd_zombie_import_add(struct obd_import *imp) {
1696         LASSERT(imp->imp_sec == NULL);
1697         spin_lock(&obd_zombie_impexp_lock);
1698         LASSERT(list_empty(&imp->imp_zombie_chain));
1699         zombies_count++;
1700         list_add(&imp->imp_zombie_chain, &obd_zombie_imports);
1701         spin_unlock(&obd_zombie_impexp_lock);
1702
1703         obd_zombie_impexp_notify();
1704 }
1705
1706 /**
1707  * notify import/export destroy thread about new zombie.
1708  */
1709 static void obd_zombie_impexp_notify(void)
1710 {
1711         /*
1712          * Make sure obd_zomebie_impexp_thread get this notification.
1713          * It is possible this signal only get by obd_zombie_barrier, and
1714          * barrier gulps this notification and sleeps away and hangs ensues
1715          */
1716         wake_up_all(&obd_zombie_waitq);
1717 }
1718
1719 /**
1720  * check whether obd_zombie is idle
1721  */
1722 static int obd_zombie_is_idle(void)
1723 {
1724         int rc;
1725
1726         LASSERT(!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags));
1727         spin_lock(&obd_zombie_impexp_lock);
1728         rc = (zombies_count == 0);
1729         spin_unlock(&obd_zombie_impexp_lock);
1730         return rc;
1731 }
1732
1733 /**
1734  * wait when obd_zombie import/export queues become empty
1735  */
1736 void obd_zombie_barrier(void)
1737 {
1738         struct l_wait_info lwi = { 0 };
1739
1740         if (obd_zombie_pid == current_pid())
1741                 /* don't wait for myself */
1742                 return;
1743         l_wait_event(obd_zombie_waitq, obd_zombie_is_idle(), &lwi);
1744 }
1745 EXPORT_SYMBOL(obd_zombie_barrier);
1746
1747
1748 struct obd_export *obd_stale_export_get(void)
1749 {
1750         struct obd_export *exp = NULL;
1751         ENTRY;
1752
1753         spin_lock(&obd_stale_export_lock);
1754         if (!list_empty(&obd_stale_exports)) {
1755                 exp = list_entry(obd_stale_exports.next,
1756                                  struct obd_export, exp_stale_list);
1757                 list_del_init(&exp->exp_stale_list);
1758         }
1759         spin_unlock(&obd_stale_export_lock);
1760
1761         if (exp) {
1762                 CDEBUG(D_DLMTRACE, "Get export %p: total %d\n", exp,
1763                        atomic_read(&obd_stale_export_num));
1764         }
1765         RETURN(exp);
1766 }
1767 EXPORT_SYMBOL(obd_stale_export_get);
1768
1769 void obd_stale_export_put(struct obd_export *exp)
1770 {
1771         ENTRY;
1772
1773         LASSERT(list_empty(&exp->exp_stale_list));
1774         if (exp->exp_lock_hash &&
1775             atomic_read(&exp->exp_lock_hash->hs_count)) {
1776                 CDEBUG(D_DLMTRACE, "Put export %p: total %d\n", exp,
1777                        atomic_read(&obd_stale_export_num));
1778
1779                 spin_lock_bh(&exp->exp_bl_list_lock);
1780                 spin_lock(&obd_stale_export_lock);
1781                 /* Add to the tail if there is no blocked locks,
1782                  * to the head otherwise. */
1783                 if (list_empty(&exp->exp_bl_list))
1784                         list_add_tail(&exp->exp_stale_list,
1785                                       &obd_stale_exports);
1786                 else
1787                         list_add(&exp->exp_stale_list,
1788                                  &obd_stale_exports);
1789
1790                 spin_unlock(&obd_stale_export_lock);
1791                 spin_unlock_bh(&exp->exp_bl_list_lock);
1792         } else {
1793                 class_export_put(exp);
1794         }
1795         EXIT;
1796 }
1797 EXPORT_SYMBOL(obd_stale_export_put);
1798
1799 /**
1800  * Adjust the position of the export in the stale list,
1801  * i.e. move to the head of the list if is needed.
1802  **/
1803 void obd_stale_export_adjust(struct obd_export *exp)
1804 {
1805         LASSERT(exp != NULL);
1806         spin_lock_bh(&exp->exp_bl_list_lock);
1807         spin_lock(&obd_stale_export_lock);
1808
1809         if (!list_empty(&exp->exp_stale_list) &&
1810             !list_empty(&exp->exp_bl_list))
1811                 list_move(&exp->exp_stale_list, &obd_stale_exports);
1812
1813         spin_unlock(&obd_stale_export_lock);
1814         spin_unlock_bh(&exp->exp_bl_list_lock);
1815 }
1816 EXPORT_SYMBOL(obd_stale_export_adjust);
1817
1818 /**
1819  * destroy zombie export/import thread.
1820  */
1821 static int obd_zombie_impexp_thread(void *unused)
1822 {
1823         unshare_fs_struct();
1824         complete(&obd_zombie_start);
1825
1826         obd_zombie_pid = current_pid();
1827
1828         while (!test_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags)) {
1829                 struct l_wait_info lwi = { 0 };
1830
1831                 l_wait_event(obd_zombie_waitq,
1832                              !obd_zombie_impexp_check(NULL), &lwi);
1833                 obd_zombie_impexp_cull();
1834
1835                 /*
1836                  * Notify obd_zombie_barrier callers that queues
1837                  * may be empty.
1838                  */
1839                 wake_up(&obd_zombie_waitq);
1840         }
1841
1842         complete(&obd_zombie_stop);
1843
1844         RETURN(0);
1845 }
1846
1847
1848 /**
1849  * start destroy zombie import/export thread
1850  */
1851 int obd_zombie_impexp_init(void)
1852 {
1853         struct task_struct *task;
1854
1855         INIT_LIST_HEAD(&obd_zombie_imports);
1856
1857         INIT_LIST_HEAD(&obd_zombie_exports);
1858         spin_lock_init(&obd_zombie_impexp_lock);
1859         init_completion(&obd_zombie_start);
1860         init_completion(&obd_zombie_stop);
1861         init_waitqueue_head(&obd_zombie_waitq);
1862         obd_zombie_pid = 0;
1863
1864         task = kthread_run(obd_zombie_impexp_thread, NULL, "obd_zombid");
1865         if (IS_ERR(task))
1866                 RETURN(PTR_ERR(task));
1867
1868         wait_for_completion(&obd_zombie_start);
1869         RETURN(0);
1870 }
1871 /**
1872  * stop destroy zombie import/export thread
1873  */
1874 void obd_zombie_impexp_stop(void)
1875 {
1876         set_bit(OBD_ZOMBIE_STOP, &obd_zombie_flags);
1877         obd_zombie_impexp_notify();
1878         wait_for_completion(&obd_zombie_stop);
1879 }
1880
1881 /***** Kernel-userspace comm helpers *******/
1882
1883 /* Get length of entire message, including header */
1884 int kuc_len(int payload_len)
1885 {
1886         return sizeof(struct kuc_hdr) + payload_len;
1887 }
1888 EXPORT_SYMBOL(kuc_len);
1889
1890 /* Get a pointer to kuc header, given a ptr to the payload
1891  * @param p Pointer to payload area
1892  * @returns Pointer to kuc header
1893  */
1894 struct kuc_hdr * kuc_ptr(void *p)
1895 {
1896         struct kuc_hdr *lh = ((struct kuc_hdr *)p) - 1;
1897         LASSERT(lh->kuc_magic == KUC_MAGIC);
1898         return lh;
1899 }
1900 EXPORT_SYMBOL(kuc_ptr);
1901
1902 /* Test if payload is part of kuc message
1903  * @param p Pointer to payload area
1904  * @returns boolean
1905  */
1906 int kuc_ispayload(void *p)
1907 {
1908         struct kuc_hdr *kh = ((struct kuc_hdr *)p) - 1;
1909
1910         if (kh->kuc_magic == KUC_MAGIC)
1911                 return 1;
1912         else
1913                 return 0;
1914 }
1915 EXPORT_SYMBOL(kuc_ispayload);
1916
1917 /* Alloc space for a message, and fill in header
1918  * @return Pointer to payload area
1919  */
1920 void *kuc_alloc(int payload_len, int transport, int type)
1921 {
1922         struct kuc_hdr *lh;
1923         int len = kuc_len(payload_len);
1924
1925         OBD_ALLOC(lh, len);
1926         if (lh == NULL)
1927                 return ERR_PTR(-ENOMEM);
1928
1929         lh->kuc_magic = KUC_MAGIC;
1930         lh->kuc_transport = transport;
1931         lh->kuc_msgtype = type;
1932         lh->kuc_msglen = len;
1933
1934         return (void *)(lh + 1);
1935 }
1936 EXPORT_SYMBOL(kuc_alloc);
1937
1938 /* Takes pointer to payload area */
1939 inline void kuc_free(void *p, int payload_len)
1940 {
1941         struct kuc_hdr *lh = kuc_ptr(p);
1942         OBD_FREE(lh, kuc_len(payload_len));
1943 }
1944 EXPORT_SYMBOL(kuc_free);
1945
1946 struct obd_request_slot_waiter {
1947         struct list_head        orsw_entry;
1948         wait_queue_head_t       orsw_waitq;
1949         bool                    orsw_signaled;
1950 };
1951
1952 static bool obd_request_slot_avail(struct client_obd *cli,
1953                                    struct obd_request_slot_waiter *orsw)
1954 {
1955         bool avail;
1956
1957         spin_lock(&cli->cl_loi_list_lock);
1958         avail = !!list_empty(&orsw->orsw_entry);
1959         spin_unlock(&cli->cl_loi_list_lock);
1960
1961         return avail;
1962 };
1963
1964 /*
1965  * For network flow control, the RPC sponsor needs to acquire a credit
1966  * before sending the RPC. The credits count for a connection is defined
1967  * by the "cl_max_rpcs_in_flight". If all the credits are occpuied, then
1968  * the subsequent RPC sponsors need to wait until others released their
1969  * credits, or the administrator increased the "cl_max_rpcs_in_flight".
1970  */
1971 int obd_get_request_slot(struct client_obd *cli)
1972 {
1973         struct obd_request_slot_waiter   orsw;
1974         struct l_wait_info               lwi;
1975         int                              rc;
1976
1977         spin_lock(&cli->cl_loi_list_lock);
1978         if (cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight) {
1979                 cli->cl_r_in_flight++;
1980                 spin_unlock(&cli->cl_loi_list_lock);
1981                 return 0;
1982         }
1983
1984         init_waitqueue_head(&orsw.orsw_waitq);
1985         list_add_tail(&orsw.orsw_entry, &cli->cl_loi_read_list);
1986         orsw.orsw_signaled = false;
1987         spin_unlock(&cli->cl_loi_list_lock);
1988
1989         lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP, NULL);
1990         rc = l_wait_event(orsw.orsw_waitq,
1991                           obd_request_slot_avail(cli, &orsw) ||
1992                           orsw.orsw_signaled,
1993                           &lwi);
1994
1995         /* Here, we must take the lock to avoid the on-stack 'orsw' to be
1996          * freed but other (such as obd_put_request_slot) is using it. */
1997         spin_lock(&cli->cl_loi_list_lock);
1998         if (rc != 0) {
1999                 if (!orsw.orsw_signaled) {
2000                         if (list_empty(&orsw.orsw_entry))
2001                                 cli->cl_r_in_flight--;
2002                         else
2003                                 list_del(&orsw.orsw_entry);
2004                 }
2005         }
2006
2007         if (orsw.orsw_signaled) {
2008                 LASSERT(list_empty(&orsw.orsw_entry));
2009
2010                 rc = -EINTR;
2011         }
2012         spin_unlock(&cli->cl_loi_list_lock);
2013
2014         return rc;
2015 }
2016 EXPORT_SYMBOL(obd_get_request_slot);
2017
2018 void obd_put_request_slot(struct client_obd *cli)
2019 {
2020         struct obd_request_slot_waiter *orsw;
2021
2022         spin_lock(&cli->cl_loi_list_lock);
2023         cli->cl_r_in_flight--;
2024
2025         /* If there is free slot, wakeup the first waiter. */
2026         if (!list_empty(&cli->cl_loi_read_list) &&
2027             likely(cli->cl_r_in_flight < cli->cl_max_rpcs_in_flight)) {
2028                 orsw = list_entry(cli->cl_loi_read_list.next,
2029                                   struct obd_request_slot_waiter, orsw_entry);
2030                 list_del_init(&orsw->orsw_entry);
2031                 cli->cl_r_in_flight++;
2032                 wake_up(&orsw->orsw_waitq);
2033         }
2034         spin_unlock(&cli->cl_loi_list_lock);
2035 }
2036 EXPORT_SYMBOL(obd_put_request_slot);
2037
2038 __u32 obd_get_max_rpcs_in_flight(struct client_obd *cli)
2039 {
2040         return cli->cl_max_rpcs_in_flight;
2041 }
2042 EXPORT_SYMBOL(obd_get_max_rpcs_in_flight);
2043
2044 int obd_set_max_rpcs_in_flight(struct client_obd *cli, __u32 max)
2045 {
2046         struct obd_request_slot_waiter *orsw;
2047         __u32                           old;
2048         int                             diff;
2049         int                             i;
2050         char                            *typ_name;
2051         int                             rc;
2052
2053         if (max > OBD_MAX_RIF_MAX || max < 1)
2054                 return -ERANGE;
2055
2056         typ_name = cli->cl_import->imp_obd->obd_type->typ_name;
2057         if (strcmp(typ_name, LUSTRE_MDC_NAME) == 0) {
2058                 /* adjust max_mod_rpcs_in_flight to ensure it is always
2059                  * strictly lower that max_rpcs_in_flight */
2060                 if (max < 2) {
2061                         CERROR("%s: cannot set max_rpcs_in_flight to 1 "
2062                                "because it must be higher than "
2063                                "max_mod_rpcs_in_flight value",
2064                                cli->cl_import->imp_obd->obd_name);
2065                         return -ERANGE;
2066                 }
2067                 if (max <= cli->cl_max_mod_rpcs_in_flight) {
2068                         rc = obd_set_max_mod_rpcs_in_flight(cli, max - 1);
2069                         if (rc != 0)
2070                                 return rc;
2071                 }
2072         }
2073
2074         spin_lock(&cli->cl_loi_list_lock);
2075         old = cli->cl_max_rpcs_in_flight;
2076         cli->cl_max_rpcs_in_flight = max;
2077         diff = max - old;
2078
2079         /* We increase the max_rpcs_in_flight, then wakeup some waiters. */
2080         for (i = 0; i < diff; i++) {
2081                 if (list_empty(&cli->cl_loi_read_list))
2082                         break;
2083
2084                 orsw = list_entry(cli->cl_loi_read_list.next,
2085                                   struct obd_request_slot_waiter, orsw_entry);
2086                 list_del_init(&orsw->orsw_entry);
2087                 cli->cl_r_in_flight++;
2088                 wake_up(&orsw->orsw_waitq);
2089         }
2090         spin_unlock(&cli->cl_loi_list_lock);
2091
2092         return 0;
2093 }
2094 EXPORT_SYMBOL(obd_set_max_rpcs_in_flight);
2095
2096 __u16 obd_get_max_mod_rpcs_in_flight(struct client_obd *cli)
2097 {
2098         return cli->cl_max_mod_rpcs_in_flight;
2099 }
2100 EXPORT_SYMBOL(obd_get_max_mod_rpcs_in_flight);
2101
2102 int obd_set_max_mod_rpcs_in_flight(struct client_obd *cli, __u16 max)
2103 {
2104         struct obd_connect_data *ocd;
2105         __u16 maxmodrpcs;
2106         __u16 prev;
2107
2108         if (max > OBD_MAX_RIF_MAX || max < 1)
2109                 return -ERANGE;
2110
2111         /* cannot exceed or equal max_rpcs_in_flight */
2112         if (max >= cli->cl_max_rpcs_in_flight) {
2113                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2114                        "higher or equal to max_rpcs_in_flight value (%u)\n",
2115                        cli->cl_import->imp_obd->obd_name,
2116                        max, cli->cl_max_rpcs_in_flight);
2117                 return -ERANGE;
2118         }
2119
2120         /* cannot exceed max modify RPCs in flight supported by the server */
2121         ocd = &cli->cl_import->imp_connect_data;
2122         if (ocd->ocd_connect_flags & OBD_CONNECT_MULTIMODRPCS)
2123                 maxmodrpcs = ocd->ocd_maxmodrpcs;
2124         else
2125                 maxmodrpcs = 1;
2126         if (max > maxmodrpcs) {
2127                 CERROR("%s: can't set max_mod_rpcs_in_flight to a value (%hu) "
2128                        "higher than max_mod_rpcs_per_client value (%hu) "
2129                        "returned by the server at connection\n",
2130                        cli->cl_import->imp_obd->obd_name,
2131                        max, maxmodrpcs);
2132                 return -ERANGE;
2133         }
2134
2135         spin_lock(&cli->cl_mod_rpcs_lock);
2136
2137         prev = cli->cl_max_mod_rpcs_in_flight;
2138         cli->cl_max_mod_rpcs_in_flight = max;
2139
2140         /* wakeup waiters if limit has been increased */
2141         if (cli->cl_max_mod_rpcs_in_flight > prev)
2142                 wake_up(&cli->cl_mod_rpcs_waitq);
2143
2144         spin_unlock(&cli->cl_mod_rpcs_lock);
2145
2146         return 0;
2147 }
2148 EXPORT_SYMBOL(obd_set_max_mod_rpcs_in_flight);
2149
2150
2151 #define pct(a, b) (b ? a * 100 / b : 0)
2152 int obd_mod_rpc_stats_seq_show(struct client_obd *cli,
2153                                struct seq_file *seq)
2154 {
2155         struct timeval now;
2156         unsigned long mod_tot = 0, mod_cum;
2157         int i;
2158
2159         do_gettimeofday(&now);
2160
2161         spin_lock(&cli->cl_mod_rpcs_lock);
2162
2163         seq_printf(seq, "snapshot_time:         %lu.%lu (secs.usecs)\n",
2164                    now.tv_sec, now.tv_usec);
2165         seq_printf(seq, "modify_RPCs_in_flight:  %hu\n",
2166                    cli->cl_mod_rpcs_in_flight);
2167
2168         seq_printf(seq, "\n\t\t\tmodify\n");
2169         seq_printf(seq, "rpcs in flight        rpcs   %% cum %%\n");
2170
2171         mod_tot = lprocfs_oh_sum(&cli->cl_mod_rpcs_hist);
2172
2173         mod_cum = 0;
2174         for (i = 0; i < OBD_HIST_MAX; i++) {
2175                 unsigned long mod = cli->cl_mod_rpcs_hist.oh_buckets[i];
2176                 mod_cum += mod;
2177                 seq_printf(seq, "%d:\t\t%10lu %3lu %3lu\n",
2178                                  i, mod, pct(mod, mod_tot),
2179                                  pct(mod_cum, mod_tot));
2180                 if (mod_cum == mod_tot)
2181                         break;
2182         }
2183
2184         spin_unlock(&cli->cl_mod_rpcs_lock);
2185
2186         return 0;
2187 }
2188 EXPORT_SYMBOL(obd_mod_rpc_stats_seq_show);
2189 #undef pct
2190
2191
2192 /* The number of modify RPCs sent in parallel is limited
2193  * because the server has a finite number of slots per client to
2194  * store request result and ensure reply reconstruction when needed.
2195  * On the client, this limit is stored in cl_max_mod_rpcs_in_flight
2196  * that takes into account server limit and cl_max_rpcs_in_flight
2197  * value.
2198  * On the MDC client, to avoid a potential deadlock (see Bugzilla 3462),
2199  * one close request is allowed above the maximum.
2200  */
2201 static inline bool obd_mod_rpc_slot_avail_locked(struct client_obd *cli,
2202                                                  bool close_req)
2203 {
2204         bool avail;
2205
2206         /* A slot is available if
2207          * - number of modify RPCs in flight is less than the max
2208          * - it's a close RPC and no other close request is in flight
2209          */
2210         avail = cli->cl_mod_rpcs_in_flight < cli->cl_max_mod_rpcs_in_flight ||
2211                 (close_req && cli->cl_close_rpcs_in_flight == 0);
2212
2213         return avail;
2214 }
2215
2216 static inline bool obd_mod_rpc_slot_avail(struct client_obd *cli,
2217                                          bool close_req)
2218 {
2219         bool avail;
2220
2221         spin_lock(&cli->cl_mod_rpcs_lock);
2222         avail = obd_mod_rpc_slot_avail_locked(cli, close_req);
2223         spin_unlock(&cli->cl_mod_rpcs_lock);
2224         return avail;
2225 }
2226
2227 /* Get a modify RPC slot from the obd client @cli according
2228  * to the kind of operation @opc that is going to be sent
2229  * and the intent @it of the operation if it applies.
2230  * If the maximum number of modify RPCs in flight is reached
2231  * the thread is put to sleep.
2232  * Returns the tag to be set in the request message. Tag 0
2233  * is reserved for non-modifying requests.
2234  */
2235 __u16 obd_get_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2236                            struct lookup_intent *it)
2237 {
2238         struct l_wait_info      lwi = LWI_INTR(NULL, NULL);
2239         bool                    close_req = false;
2240         __u16                   i, max;
2241
2242         /* read-only metadata RPCs don't consume a slot on MDT
2243          * for reply reconstruction
2244          */
2245         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2246                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2247                 return 0;
2248
2249         if (opc == MDS_CLOSE)
2250                 close_req = true;
2251
2252         do {
2253                 spin_lock(&cli->cl_mod_rpcs_lock);
2254                 max = cli->cl_max_mod_rpcs_in_flight;
2255                 if (obd_mod_rpc_slot_avail_locked(cli, close_req)) {
2256                         /* there is a slot available */
2257                         cli->cl_mod_rpcs_in_flight++;
2258                         if (close_req)
2259                                 cli->cl_close_rpcs_in_flight++;
2260                         lprocfs_oh_tally(&cli->cl_mod_rpcs_hist,
2261                                          cli->cl_mod_rpcs_in_flight);
2262                         /* find a free tag */
2263                         i = find_first_zero_bit(cli->cl_mod_tag_bitmap,
2264                                                 max + 1);
2265                         LASSERT(i < OBD_MAX_RIF_MAX);
2266                         LASSERT(!test_and_set_bit(i, cli->cl_mod_tag_bitmap));
2267                         spin_unlock(&cli->cl_mod_rpcs_lock);
2268                         /* tag 0 is reserved for non-modify RPCs */
2269                         return i + 1;
2270                 }
2271                 spin_unlock(&cli->cl_mod_rpcs_lock);
2272
2273                 CDEBUG(D_RPCTRACE, "%s: sleeping for a modify RPC slot "
2274                        "opc %u, max %hu\n",
2275                        cli->cl_import->imp_obd->obd_name, opc, max);
2276
2277                 l_wait_event(cli->cl_mod_rpcs_waitq,
2278                              obd_mod_rpc_slot_avail(cli, close_req), &lwi);
2279         } while (true);
2280 }
2281 EXPORT_SYMBOL(obd_get_mod_rpc_slot);
2282
2283 /* Put a modify RPC slot from the obd client @cli according
2284  * to the kind of operation @opc that has been sent and the
2285  * intent @it of the operation if it applies.
2286  */
2287 void obd_put_mod_rpc_slot(struct client_obd *cli, __u32 opc,
2288                           struct lookup_intent *it, __u16 tag)
2289 {
2290         bool                    close_req = false;
2291
2292         if (it != NULL && (it->it_op == IT_GETATTR || it->it_op == IT_LOOKUP ||
2293                            it->it_op == IT_LAYOUT || it->it_op == IT_READDIR))
2294                 return;
2295
2296         if (opc == MDS_CLOSE)
2297                 close_req = true;
2298
2299         spin_lock(&cli->cl_mod_rpcs_lock);
2300         cli->cl_mod_rpcs_in_flight--;
2301         if (close_req)
2302                 cli->cl_close_rpcs_in_flight--;
2303         /* release the tag in the bitmap */
2304         LASSERT(tag - 1 < OBD_MAX_RIF_MAX);
2305         LASSERT(test_and_clear_bit(tag - 1, cli->cl_mod_tag_bitmap) != 0);
2306         spin_unlock(&cli->cl_mod_rpcs_lock);
2307         wake_up(&cli->cl_mod_rpcs_waitq);
2308 }
2309 EXPORT_SYMBOL(obd_put_mod_rpc_slot);
2310