Whamcloud - gitweb
LU-11997 ptlrpc: Properly swab ll_fiemap_info_key
[fs/lustre-release.git] / lustre / obdecho / echo.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/obdecho/echo.c
33  *
34  * Author: Peter Braam <braam@clusterfs.com>
35  * Author: Andreas Dilger <adilger@clusterfs.com>
36  */
37
38 #define DEBUG_SUBSYSTEM S_ECHO
39
40 #include <obd_support.h>
41 #include <obd_class.h>
42 #include <lustre_debug.h>
43 #include <lustre_dlm.h>
44 #include <lprocfs_status.h>
45
46 #include "echo_internal.h"
47
48 /*
49  * The echo objid needs to be below 2^32, because regular FID numbers are
50  * limited to 2^32 objects in f_oid for the FID_SEQ_ECHO range. b=23335
51  */
52 #define ECHO_INIT_OID        0x10000000ULL
53 #define ECHO_HANDLE_MAGIC    0xabcd0123fedc9876ULL
54
55 #define ECHO_PERSISTENT_PAGES (ECHO_PERSISTENT_SIZE >> PAGE_SHIFT)
56 static struct page *echo_persistent_pages[ECHO_PERSISTENT_PAGES];
57
58 enum {
59         LPROC_ECHO_READ_BYTES = 1,
60         LPROC_ECHO_WRITE_BYTES = 2,
61         LPROC_ECHO_LAST = LPROC_ECHO_WRITE_BYTES + 1
62 };
63
64 struct echo_srv_device {
65         struct lu_device esd_dev;
66         struct lu_target esd_lut;
67 };
68
69 static inline struct echo_srv_device *echo_srv_dev(struct lu_device *d)
70 {
71         return container_of0(d, struct echo_srv_device, esd_dev);
72 }
73
74 static inline struct obd_device *echo_srv_obd(struct echo_srv_device *esd)
75 {
76         return esd->esd_dev.ld_obd;
77 }
78
79 static int echo_connect(const struct lu_env *env,
80                         struct obd_export **exp, struct obd_device *obd,
81                         struct obd_uuid *cluuid, struct obd_connect_data *data,
82                         void *localdata)
83 {
84         struct lustre_handle conn = { 0 };
85         int rc;
86
87         data->ocd_connect_flags &= ECHO_CONNECT_SUPPORTED;
88
89         if (data->ocd_connect_flags & OBD_CONNECT_FLAGS2)
90                 data->ocd_connect_flags2 &= ECHO_CONNECT_SUPPORTED2;
91
92         rc = class_connect(&conn, obd, cluuid);
93         if (rc) {
94                 CERROR("can't connect %d\n", rc);
95                 return rc;
96         }
97         *exp = class_conn2export(&conn);
98
99         return 0;
100 }
101
102 static int echo_disconnect(struct obd_export *exp)
103 {
104         LASSERT(exp != NULL);
105
106         return server_disconnect_export(exp);
107 }
108
109 static int echo_init_export(struct obd_export *exp)
110 {
111         return ldlm_init_export(exp);
112 }
113
114 static int echo_destroy_export(struct obd_export *exp)
115 {
116         ENTRY;
117
118         target_destroy_export(exp);
119         ldlm_destroy_export(exp);
120
121         RETURN(0);
122 }
123
124 static u64 echo_next_id(struct obd_device *obddev)
125 {
126         u64 id;
127
128         spin_lock(&obddev->u.echo.eo_lock);
129         id = ++obddev->u.echo.eo_lastino;
130         spin_unlock(&obddev->u.echo.eo_lock);
131
132         return id;
133 }
134
135 static void
136 echo_page_debug_setup(struct page *page, int rw, u64 id,
137                       __u64 offset, int len)
138 {
139         int   page_offset = offset & ~PAGE_MASK;
140         char *addr        = ((char *)kmap(page)) + page_offset;
141
142         if (len % OBD_ECHO_BLOCK_SIZE != 0)
143                 CERROR("Unexpected block size %d\n", len);
144
145         while (len > 0) {
146                 if (rw & OBD_BRW_READ)
147                         block_debug_setup(addr, OBD_ECHO_BLOCK_SIZE,
148                                           offset, id);
149                 else
150                         block_debug_setup(addr, OBD_ECHO_BLOCK_SIZE,
151                                           0xecc0ecc0ecc0ecc0ULL,
152                                           0xecc0ecc0ecc0ecc0ULL);
153
154                 addr   += OBD_ECHO_BLOCK_SIZE;
155                 offset += OBD_ECHO_BLOCK_SIZE;
156                 len    -= OBD_ECHO_BLOCK_SIZE;
157         }
158
159         kunmap(page);
160 }
161
162 static int
163 echo_page_debug_check(struct page *page, u64 id,
164                       __u64 offset, int len)
165 {
166         int   page_offset = offset & ~PAGE_MASK;
167         char *addr        = ((char *)kmap(page)) + page_offset;
168         int   rc          = 0;
169         int   rc2;
170
171         if (len % OBD_ECHO_BLOCK_SIZE != 0)
172                 CERROR("Unexpected block size %d\n", len);
173
174         while (len > 0) {
175                 rc2 = block_debug_check("echo", addr, OBD_ECHO_BLOCK_SIZE,
176                                         offset, id);
177
178                 if (rc2 != 0 && rc == 0)
179                         rc = rc2;
180
181                 addr   += OBD_ECHO_BLOCK_SIZE;
182                 offset += OBD_ECHO_BLOCK_SIZE;
183                 len    -= OBD_ECHO_BLOCK_SIZE;
184         }
185
186         kunmap(page);
187
188         return rc;
189 }
190
191 static int echo_map_nb_to_lb(struct obdo *oa, struct obd_ioobj *obj,
192                              struct niobuf_remote *nb, int *pages,
193                              struct niobuf_local *lb, int cmd, int *left)
194 {
195         gfp_t gfp_mask = (ostid_id(&obj->ioo_oid) & 1) ?
196                         GFP_HIGHUSER : GFP_KERNEL;
197         int ispersistent = ostid_id(&obj->ioo_oid) == ECHO_PERSISTENT_OBJID;
198         int debug_setup = (!ispersistent &&
199                            (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
200                            (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
201         struct niobuf_local *res = lb;
202         u64 offset = nb->rnb_offset;
203         int len = nb->rnb_len;
204
205         while (len > 0) {
206                 int plen = PAGE_SIZE - (offset & (PAGE_SIZE - 1));
207
208                 if (len < plen)
209                         plen = len;
210
211                 /* check for local buf overflow */
212                 if (*left == 0)
213                         return -EINVAL;
214
215                 res->lnb_file_offset = offset;
216                 res->lnb_len = plen;
217                 LASSERT((res->lnb_file_offset & ~PAGE_MASK) +
218                         res->lnb_len <= PAGE_SIZE);
219
220                 if (ispersistent &&
221                     ((res->lnb_file_offset >> PAGE_SHIFT) <
222                       ECHO_PERSISTENT_PAGES)) {
223                         res->lnb_page =
224                                 echo_persistent_pages[res->lnb_file_offset >>
225                                                       PAGE_SHIFT];
226                         /* Take extra ref so __free_pages() can be called OK */
227                         get_page(res->lnb_page);
228                 } else {
229                         res->lnb_page = alloc_page(gfp_mask);
230                         if (!res->lnb_page) {
231                                 CERROR("can't get page for id " DOSTID"\n",
232                                        POSTID(&obj->ioo_oid));
233                                 return -ENOMEM;
234                         }
235                 }
236
237                 CDEBUG(D_PAGE, "$$$$ get page %p @ %llu for %d\n",
238                        res->lnb_page, res->lnb_file_offset, res->lnb_len);
239
240                 if (cmd & OBD_BRW_READ)
241                         res->lnb_rc = res->lnb_len;
242
243                 if (debug_setup)
244                         echo_page_debug_setup(res->lnb_page, cmd,
245                                               ostid_id(&obj->ioo_oid),
246                                               res->lnb_file_offset,
247                                               res->lnb_len);
248
249                 offset += plen;
250                 len -= plen;
251                 res++;
252
253                 (*left)--;
254                 (*pages)++;
255         }
256
257         return 0;
258 }
259
260 static int echo_finalize_lb(struct obdo *oa, struct obd_ioobj *obj,
261                             struct niobuf_remote *rb, int *pgs,
262                             struct niobuf_local *lb, int verify)
263 {
264         struct niobuf_local *res = lb;
265         u64 start = rb->rnb_offset >> PAGE_SHIFT;
266         u64 end   = (rb->rnb_offset + rb->rnb_len + PAGE_SIZE - 1) >>
267                     PAGE_SHIFT;
268         int     count  = (int)(end - start);
269         int     rc     = 0;
270         int     i;
271
272         for (i = 0; i < count; i++, (*pgs) ++, res++) {
273                 struct page *page = res->lnb_page;
274                 void       *addr;
275
276                 if (!page) {
277                         CERROR("null page objid %llu:%p, buf %d/%d\n",
278                                ostid_id(&obj->ioo_oid), page, i,
279                                obj->ioo_bufcnt);
280                         return -EFAULT;
281                 }
282
283                 addr = kmap(page);
284
285                 CDEBUG(D_PAGE, "$$$$ use page %p, addr %p@%llu\n",
286                        res->lnb_page, addr, res->lnb_file_offset);
287
288                 if (verify) {
289                         int vrc = echo_page_debug_check(page,
290                                                         ostid_id(&obj->ioo_oid),
291                                                         res->lnb_file_offset,
292                                                         res->lnb_len);
293                         /* check all the pages always */
294                         if (vrc != 0 && rc == 0)
295                                 rc = vrc;
296                 }
297
298                 kunmap(page);
299                 /* NB see comment above regarding persistent pages */
300                 __free_page(page);
301         }
302
303         return rc;
304 }
305
306 static int echo_preprw(const struct lu_env *env, int cmd,
307                        struct obd_export *export, struct obdo *oa,
308                        int objcount, struct obd_ioobj *obj,
309                        struct niobuf_remote *nb, int *pages,
310                        struct niobuf_local *res)
311 {
312         struct obd_device *obd;
313         int tot_bytes = 0;
314         int rc = 0;
315         int i, left;
316
317         ENTRY;
318
319         obd = export->exp_obd;
320         if (!obd)
321                 RETURN(-EINVAL);
322
323         /* Temp fix to stop falling foul of osc_announce_cached() */
324         oa->o_valid &= ~(OBD_MD_FLBLOCKS | OBD_MD_FLGRANT);
325
326         memset(res, 0, sizeof(*res) * *pages);
327
328         CDEBUG(D_PAGE, "%s %d obdos with %d IOs\n",
329                cmd == OBD_BRW_READ ? "reading" : "writing", objcount, *pages);
330
331         left = *pages;
332         *pages = 0;
333
334         for (i = 0; i < objcount; i++, obj++) {
335                 int j;
336
337                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, nb++) {
338                         rc = echo_map_nb_to_lb(oa, obj, nb, pages,
339                                                res + *pages, cmd, &left);
340                         if (rc)
341                                 GOTO(preprw_cleanup, rc);
342
343                         tot_bytes += nb->rnb_len;
344                 }
345         }
346
347         atomic_add(*pages, &obd->u.echo.eo_prep);
348
349         if (cmd & OBD_BRW_READ)
350                 lprocfs_counter_add(obd->obd_stats, LPROC_ECHO_READ_BYTES,
351                                     tot_bytes);
352         else
353                 lprocfs_counter_add(obd->obd_stats, LPROC_ECHO_WRITE_BYTES,
354                                     tot_bytes);
355
356         CDEBUG(D_PAGE, "%d pages allocated after prep\n",
357                atomic_read(&obd->u.echo.eo_prep));
358
359         RETURN(0);
360
361 preprw_cleanup:
362         /*
363          * It is possible that we would rather handle errors by  allow
364          * any already-set-up pages to complete, rather than tearing them
365          * all down again.  I believe that this is what the in-kernel
366          * prep/commit operations do.
367          */
368         CERROR("cleaning up %u pages (%d obdos)\n", *pages, objcount);
369         for (i = 0; i < *pages; i++) {
370                 kunmap(res[i].lnb_page);
371                 /*
372                  * NB if this is a persistent page, __free_page() will just
373                  * lose the extra ref gained above
374                  */
375                 __free_page(res[i].lnb_page);
376                 res[i].lnb_page = NULL;
377                 atomic_dec(&obd->u.echo.eo_prep);
378         }
379
380         return rc;
381 }
382
383 static int echo_commitrw(const struct lu_env *env, int cmd,
384                          struct obd_export *export, struct obdo *oa,
385                          int objcount, struct obd_ioobj *obj,
386                          struct niobuf_remote *rb, int niocount,
387                          struct niobuf_local *res, int rc)
388 {
389         struct obd_device *obd;
390         int pgs = 0;
391         int i;
392
393         ENTRY;
394
395         obd = export->exp_obd;
396         if (!obd)
397                 RETURN(-EINVAL);
398
399         if (rc)
400                 GOTO(commitrw_cleanup, rc);
401
402         if ((cmd & OBD_BRW_RWMASK) == OBD_BRW_READ) {
403                 CDEBUG(D_PAGE, "reading %d obdos with %d IOs\n",
404                        objcount, niocount);
405         } else {
406                 CDEBUG(D_PAGE, "writing %d obdos with %d IOs\n",
407                        objcount, niocount);
408         }
409
410         if (niocount && !res) {
411                 CERROR("NULL res niobuf with niocount %d\n", niocount);
412                 RETURN(-EINVAL);
413         }
414
415         for (i = 0; i < objcount; i++, obj++) {
416                 int verify = (rc == 0 &&
417                              ostid_id(&obj->ioo_oid) != ECHO_PERSISTENT_OBJID &&
418                               (oa->o_valid & OBD_MD_FLFLAGS) != 0 &&
419                               (oa->o_flags & OBD_FL_DEBUG_CHECK) != 0);
420                 int j;
421
422                 for (j = 0 ; j < obj->ioo_bufcnt ; j++, rb++) {
423                         int vrc = echo_finalize_lb(oa, obj, rb, &pgs, &res[pgs],
424                                                    verify);
425                         if (vrc == 0)
426                                 continue;
427
428                         if (vrc == -EFAULT)
429                                 GOTO(commitrw_cleanup, rc = vrc);
430
431                         if (rc == 0)
432                                 rc = vrc;
433                 }
434         }
435
436         atomic_sub(pgs, &obd->u.echo.eo_prep);
437
438         CDEBUG(D_PAGE, "%d pages remain after commit\n",
439                atomic_read(&obd->u.echo.eo_prep));
440         RETURN(rc);
441
442 commitrw_cleanup:
443         atomic_sub(pgs, &obd->u.echo.eo_prep);
444
445         CERROR("cleaning up %d pages (%d obdos)\n",
446                niocount - pgs - 1, objcount);
447
448         while (pgs < niocount) {
449                 struct page *page = res[pgs++].lnb_page;
450
451                 if (!page)
452                         continue;
453
454                 /* NB see comment above regarding persistent pages */
455                 __free_page(page);
456                 atomic_dec(&obd->u.echo.eo_prep);
457         }
458         return rc;
459 }
460
461 LPROC_SEQ_FOPS_RO_TYPE(echo, uuid);
462 static struct lprocfs_vars lprocfs_echo_obd_vars[] = {
463         { .name =       "uuid",
464           .fops =       &echo_uuid_fops         },
465         { NULL }
466 };
467
468 struct obd_ops echo_obd_ops = {
469         .o_owner           = THIS_MODULE,
470         .o_connect         = echo_connect,
471         .o_disconnect      = echo_disconnect,
472         .o_init_export     = echo_init_export,
473         .o_destroy_export  = echo_destroy_export,
474         .o_preprw          = echo_preprw,
475         .o_commitrw        = echo_commitrw,
476 };
477
478 /**
479  * Echo Server request handler for OST_CREATE RPC.
480  *
481  * This is part of request processing. Its simulates the object
482  * creation on OST.
483  *
484  * \param[in] tsi       target session environment for this request
485  *
486  * \retval              0 if successful
487  * \retval              negative value on error
488  */
489 static int esd_create_hdl(struct tgt_session_info *tsi)
490 {
491         const struct obdo *oa = &tsi->tsi_ost_body->oa;
492         struct obd_device *obd = tsi->tsi_exp->exp_obd;
493         struct ost_body *repbody;
494         struct obdo *rep_oa;
495
496         ENTRY;
497
498         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
499         if (!repbody)
500                 RETURN(-ENOMEM);
501
502         if (!(oa->o_mode & S_IFMT)) {
503                 CERROR("%s: no type is set in obdo!\n",
504                        tsi->tsi_exp->exp_obd->obd_name);
505                 RETURN(-ENOENT);
506         }
507
508         if (!(oa->o_valid & OBD_MD_FLTYPE)) {
509                 CERROR("%s: invalid o_valid in obdo: %#llx\n",
510                        tsi->tsi_exp->exp_obd->obd_name, oa->o_valid);
511                 RETURN(-EINVAL);
512         }
513
514         rep_oa = &repbody->oa;
515
516         if (!fid_seq_is_echo(ostid_seq(&oa->o_oi))) {
517                 CERROR("%s: invalid seq %#llx\n",
518                        tsi->tsi_exp->exp_obd->obd_name, ostid_seq(&oa->o_oi));
519                 return -EINVAL;
520         }
521
522         ostid_set_seq_echo(&rep_oa->o_oi);
523         ostid_set_id(&rep_oa->o_oi, echo_next_id(obd));
524
525         CDEBUG(D_INFO, "%s: Create object "DOSTID"\n",
526                tsi->tsi_exp->exp_obd->obd_name, POSTID(&rep_oa->o_oi));
527
528         rep_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
529
530         RETURN(0);
531 }
532
533 /**
534  * Echo Server request handler for OST_DESTROY RPC.
535  *
536  * This is Echo Server part of request handling. It simulates the objects
537  * destroy on OST.
538  *
539  * \param[in] tsi       target session environment for this request
540  *
541  * \retval              0 if successful
542  * \retval              negative value on error
543  */
544 static int esd_destroy_hdl(struct tgt_session_info *tsi)
545 {
546         const struct obdo *oa = &tsi->tsi_ost_body->oa;
547         struct obd_device *obd = tsi->tsi_exp->exp_obd;
548         struct ost_body *repbody;
549         u64 oid;
550
551         ENTRY;
552
553         oid = ostid_id(&oa->o_oi);
554         LASSERT(oid != 0);
555
556         if (!(oa->o_valid & OBD_MD_FLID)) {
557                 CERROR("%s: obdo missing FLID valid flag: %#llx\n",
558                        tsi->tsi_exp->exp_obd->obd_name, oa->o_valid);
559                 RETURN(-EINVAL);
560         }
561
562         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
563
564         if (ostid_id(&oa->o_oi) > obd->u.echo.eo_lastino ||
565             ostid_id(&oa->o_oi) < ECHO_INIT_OID) {
566                 CERROR("%s: bad objid to destroy: "DOSTID"\n",
567                        tsi->tsi_exp->exp_obd->obd_name, POSTID(&oa->o_oi));
568                 RETURN(-EINVAL);
569         }
570
571         CDEBUG(D_INFO, "%s: Destroy object "DOSTID"\n",
572                tsi->tsi_exp->exp_obd->obd_name, POSTID(&oa->o_oi));
573
574         repbody->oa.o_oi = oa->o_oi;
575         RETURN(0);
576 }
577
578 /**
579  * Echo Server request handler for OST_GETATTR RPC.
580  *
581  * This is Echo Server part of request handling. It returns an object
582  * attributes to the client. All objects have the same attributes in
583  * Echo Server.
584  *
585  * \param[in] tsi       target session environment for this request
586  *
587  * \retval              0 if successful
588  * \retval              negative value on error
589  */
590 static int esd_getattr_hdl(struct tgt_session_info *tsi)
591 {
592         const struct obdo *oa = &tsi->tsi_ost_body->oa;
593         struct obd_device *obd = tsi->tsi_exp->exp_obd;
594         struct ost_body *repbody;
595
596         ENTRY;
597
598         if (!(oa->o_valid & OBD_MD_FLID)) {
599                 CERROR("%s: obdo missing FLID valid flag: %#llx\n",
600                        tsi->tsi_exp->exp_obd->obd_name, oa->o_valid);
601                 RETURN(-EINVAL);
602         }
603
604         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
605         if (!repbody)
606                 RETURN(-ENOMEM);
607
608         repbody->oa.o_oi = oa->o_oi;
609         repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
610
611         obdo_cpy_md(&repbody->oa, &obd->u.echo.eo_oa, oa->o_valid);
612
613         repbody->oa.o_valid |= OBD_MD_FLFLAGS;
614         repbody->oa.o_flags = OBD_FL_FLUSH;
615
616         RETURN(0);
617 }
618
619 /**
620  * Echo Server request handler for OST_SETATTR RPC.
621  *
622  * This is Echo Server part of request handling. It sets common
623  * attributes from request to the Echo Server objects.
624  *
625  * \param[in] tsi       target session environment for this request
626  *
627  * \retval              0 if successful
628  * \retval              negative value on error
629  */
630 static int esd_setattr_hdl(struct tgt_session_info *tsi)
631 {
632         struct ost_body *body = tsi->tsi_ost_body;
633         struct obd_device *obd = tsi->tsi_exp->exp_obd;
634         struct ost_body *repbody;
635
636         ENTRY;
637
638         if (!(body->oa.o_valid & OBD_MD_FLID)) {
639                 CERROR("%s: obdo missing FLID valid flag: %#llx\n",
640                        tsi->tsi_exp->exp_obd->obd_name,
641                        body->oa.o_valid);
642                 RETURN(-EINVAL);
643         }
644
645         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
646         if (!repbody)
647                 RETURN(-ENOMEM);
648
649         repbody->oa.o_oi = body->oa.o_oi;
650         repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
651
652         obd->u.echo.eo_oa = body->oa;
653
654         RETURN(0);
655 }
656
657 #define OBD_FAIL_OST_READ_NET   OBD_FAIL_OST_BRW_NET
658 #define OBD_FAIL_OST_WRITE_NET  OBD_FAIL_OST_BRW_NET
659 #define OST_BRW_READ    OST_READ
660 #define OST_BRW_WRITE   OST_WRITE
661
662 /**
663  * Table of Echo Server specific request handlers
664  *
665  * This table contains all opcodes accepted by Echo Server and
666  * specifies handlers for them. The tgt_request_handler()
667  * uses such table from each target to process incoming
668  * requests.
669  */
670 static struct tgt_handler esd_tgt_handlers[] = {
671 TGT_RPC_HANDLER(OST_FIRST_OPC, 0, OST_CONNECT, tgt_connect,
672                 &RQF_CONNECT, LUSTRE_OBD_VERSION),
673 TGT_RPC_HANDLER(OST_FIRST_OPC, 0, OST_DISCONNECT, tgt_disconnect,
674                 &RQF_OST_DISCONNECT, LUSTRE_OBD_VERSION),
675 TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_GETATTR, esd_getattr_hdl),
676 TGT_OST_HDL(HAS_BODY | HAS_REPLY | IS_MUTABLE, OST_SETATTR,
677             esd_setattr_hdl),
678 TGT_OST_HDL(HAS_REPLY | IS_MUTABLE, OST_CREATE, esd_create_hdl),
679 TGT_OST_HDL(HAS_REPLY | IS_MUTABLE, OST_DESTROY, esd_destroy_hdl),
680 TGT_OST_HDL(HAS_BODY | HAS_REPLY, OST_BRW_READ, tgt_brw_read),
681 TGT_OST_HDL(HAS_BODY | IS_MUTABLE, OST_BRW_WRITE, tgt_brw_write),
682 };
683
684 static struct tgt_opc_slice esd_common_slice[] = {
685         {
686                 .tos_opc_start  = OST_FIRST_OPC,
687                 .tos_opc_end    = OST_LAST_OPC,
688                 .tos_hs         = esd_tgt_handlers
689         },
690         {
691                 .tos_opc_start  = OBD_FIRST_OPC,
692                 .tos_opc_end    = OBD_LAST_OPC,
693                 .tos_hs         = tgt_obd_handlers
694         },
695         {
696                 .tos_opc_start  = LDLM_FIRST_OPC,
697                 .tos_opc_end    = LDLM_LAST_OPC,
698                 .tos_hs         = tgt_dlm_handlers
699         },
700         {
701                 .tos_opc_start  = SEC_FIRST_OPC,
702                 .tos_opc_end    = SEC_LAST_OPC,
703                 .tos_hs         = tgt_sec_ctx_handlers
704         },
705         {
706                 .tos_hs         = NULL
707         }
708 };
709
710 /**
711  * lu_device_operations matrix for ECHO SRV device is NULL,
712  * this device is just serving incoming requests immediately
713  * without building a stack of lu_devices.
714  */
715 static struct lu_device_operations echo_srv_lu_ops = { 0 };
716
717 /**
718  * Initialize Echo Server device with parameters in the config log \a cfg.
719  *
720  * This is the main starting point of Echo Server initialization. It fills all
721  * parameters with their initial values and starts Echo Server.
722  *
723  * \param[in] env       execution environment
724  * \param[in] m         Echo Server device
725  * \param[in] ldt       LU device type of Echo Server
726  * \param[in] cfg       configuration log
727  *
728  * \retval              0 if successful
729  * \retval              negative value on error
730  */
731 static int echo_srv_init0(const struct lu_env *env,
732                           struct echo_srv_device *esd,
733                           struct lu_device_type *ldt, struct lustre_cfg *cfg)
734 {
735         const char *dev = lustre_cfg_string(cfg, 0);
736         struct obd_device *obd;
737         char ns_name[48];
738         int rc;
739
740         ENTRY;
741
742         obd = class_name2obd(dev);
743         if (!obd) {
744                 CERROR("Cannot find obd with name %s\n", dev);
745                 RETURN(-ENODEV);
746         }
747
748         spin_lock_init(&obd->u.echo.eo_lock);
749         obd->u.echo.eo_lastino = ECHO_INIT_OID;
750
751         esd->esd_dev.ld_ops = &echo_srv_lu_ops;
752         esd->esd_dev.ld_obd = obd;
753         /* set this lu_device to obd, because error handling need it */
754         obd->obd_lu_dev = &esd->esd_dev;
755
756         /* No connection accepted until configurations will finish */
757         spin_lock(&obd->obd_dev_lock);
758         obd->obd_no_conn = 1;
759         spin_unlock(&obd->obd_dev_lock);
760
761         /* non-replayable target */
762         obd->obd_replayable = 0;
763
764         snprintf(ns_name, sizeof(ns_name), "echotgt-%s", obd->obd_uuid.uuid);
765         obd->obd_namespace = ldlm_namespace_new(obd, ns_name,
766                                                 LDLM_NAMESPACE_SERVER,
767                                                 LDLM_NAMESPACE_MODEST,
768                                                 LDLM_NS_TYPE_OST);
769         if (!obd->obd_namespace)
770                 RETURN(-ENOMEM);
771
772         obd->obd_vars = lprocfs_echo_obd_vars;
773         if (!lprocfs_obd_setup(obd, true) &&
774             lprocfs_alloc_obd_stats(obd, LPROC_ECHO_LAST) == 0) {
775                 lprocfs_counter_init(obd->obd_stats, LPROC_ECHO_READ_BYTES,
776                                      LPROCFS_CNTR_AVGMINMAX,
777                                      "read_bytes", "bytes");
778                 lprocfs_counter_init(obd->obd_stats, LPROC_ECHO_WRITE_BYTES,
779                                      LPROCFS_CNTR_AVGMINMAX,
780                                      "write_bytes", "bytes");
781         }
782
783         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
784                            "echo_ldlm_cb_client", &obd->obd_ldlm_client);
785
786         rc = tgt_init(env, &esd->esd_lut, obd, NULL, esd_common_slice,
787                       OBD_FAIL_OST_ALL_REQUEST_NET,
788                       OBD_FAIL_OST_ALL_REPLY_NET);
789         if (rc)
790                 GOTO(err_out, rc);
791
792         spin_lock(&obd->obd_dev_lock);
793         obd->obd_no_conn = 0;
794         spin_unlock(&obd->obd_dev_lock);
795
796         RETURN(0);
797
798 err_out:
799         ldlm_namespace_free(obd->obd_namespace, NULL, obd->obd_force);
800         obd->obd_namespace = NULL;
801
802         lprocfs_obd_cleanup(obd);
803         lprocfs_free_obd_stats(obd);
804         RETURN(rc);
805 }
806
807 /**
808  * Stop the Echo Server device.
809  *
810  * This function stops the Echo Server device and all its subsystems.
811  * This is the end of Echo Server lifecycle.
812  *
813  * \param[in] env       execution environment
814  * \param[in] esd               ESD device
815  */
816 static void echo_srv_fini(const struct lu_env *env,
817                           struct echo_srv_device *esd)
818 {
819         struct obd_device *obd = echo_srv_obd(esd);
820         struct lu_device *d = &esd->esd_dev;
821         int leaked;
822
823         ENTRY;
824
825         class_disconnect_exports(obd);
826         if (obd->obd_namespace)
827                 ldlm_namespace_free_prior(obd->obd_namespace, NULL,
828                                           obd->obd_force);
829
830         obd_exports_barrier(obd);
831         obd_zombie_barrier();
832
833         tgt_fini(env, &esd->esd_lut);
834
835         if (obd->obd_namespace) {
836                 ldlm_namespace_free_post(obd->obd_namespace);
837                 obd->obd_namespace = NULL;
838         }
839
840         lprocfs_obd_cleanup(obd);
841         lprocfs_free_obd_stats(obd);
842
843         leaked = atomic_read(&obd->u.echo.eo_prep);
844         if (leaked != 0)
845                 CERROR("%d prep/commitrw pages leaked\n", leaked);
846
847         LASSERT(atomic_read(&d->ld_ref) == 0);
848         EXIT;
849 }
850
851 /**
852  * Implementation of lu_device_type_operations::ldto_device_fini.
853  *
854  * Finalize device. Dual to echo_srv_device_init(). It is called from
855  * obd_precleanup() and stops the current device.
856  *
857  * \param[in] env       execution environment
858  * \param[in] d         LU device of ESD
859  *
860  * \retval              NULL
861  */
862 static struct lu_device *echo_srv_device_fini(const struct lu_env *env,
863                                               struct lu_device *d)
864 {
865         ENTRY;
866         echo_srv_fini(env, echo_srv_dev(d));
867         RETURN(NULL);
868 }
869
870 /**
871  * Implementation of lu_device_type_operations::ldto_device_free.
872  *
873  * Free Echo Server device. Dual to echo_srv_device_alloc().
874  *
875  * \param[in] env       execution environment
876  * \param[in] d         LU device of ESD
877  *
878  * \retval              NULL
879  */
880 static struct lu_device *echo_srv_device_free(const struct lu_env *env,
881                                               struct lu_device *d)
882 {
883         struct echo_srv_device *esd = echo_srv_dev(d);
884
885         lu_device_fini(&esd->esd_dev);
886         OBD_FREE_PTR(esd);
887         RETURN(NULL);
888 }
889
890 /**
891  * Implementation of lu_device_type_operations::ldto_device_alloc.
892  *
893  * This function allocates the new Echo Server device. It is called from
894  * obd_setup() if OBD device had lu_device_type defined.
895  *
896  * \param[in] env       execution environment
897  * \param[in] t         lu_device_type of ESD device
898  * \param[in] cfg       configuration log
899  *
900  * \retval              pointer to the lu_device of just allocated OFD
901  * \retval              ERR_PTR of return value on error
902  */
903 static struct lu_device *echo_srv_device_alloc(const struct lu_env *env,
904                                                struct lu_device_type *t,
905                                                struct lustre_cfg *cfg)
906 {
907         struct echo_srv_device *esd;
908         struct lu_device *l;
909         int rc;
910
911         OBD_ALLOC_PTR(esd);
912         if (!esd)
913                 return ERR_PTR(-ENOMEM);
914
915         l = &esd->esd_dev;
916         lu_device_init(l, t);
917         rc = echo_srv_init0(env, esd, t, cfg);
918         if (rc != 0) {
919                 echo_srv_device_free(env, l);
920                 l = ERR_PTR(rc);
921         }
922
923         return l;
924 }
925
926 static const struct lu_device_type_operations echo_srv_type_ops = {
927         .ldto_device_alloc = echo_srv_device_alloc,
928         .ldto_device_free = echo_srv_device_free,
929         .ldto_device_fini = echo_srv_device_fini
930 };
931
932 struct lu_device_type echo_srv_type = {
933         .ldt_tags = LU_DEVICE_DT,
934         .ldt_name = LUSTRE_ECHO_NAME,
935         .ldt_ops = &echo_srv_type_ops,
936         .ldt_ctx_tags = LCT_DT_THREAD,
937 };
938
939 void echo_persistent_pages_fini(void)
940 {
941         int i;
942
943         for (i = 0; i < ECHO_PERSISTENT_PAGES; i++)
944                 if (echo_persistent_pages[i]) {
945                         __free_page(echo_persistent_pages[i]);
946                         echo_persistent_pages[i] = NULL;
947                 }
948 }
949
950 int echo_persistent_pages_init(void)
951 {
952         struct page *pg;
953         int          i;
954
955         for (i = 0; i < ECHO_PERSISTENT_PAGES; i++) {
956                 gfp_t gfp_mask = (i < ECHO_PERSISTENT_PAGES / 2) ?
957                         GFP_KERNEL : GFP_HIGHUSER;
958
959                 pg = alloc_page(gfp_mask);
960                 if (!pg) {
961                         echo_persistent_pages_fini();
962                         return -ENOMEM;
963                 }
964
965                 memset(kmap(pg), 0, PAGE_SIZE);
966                 kunmap(pg);
967
968                 echo_persistent_pages[i] = pg;
969         }
970
971         return 0;
972 }