Whamcloud - gitweb
c6cd621e5ecfc1f612b7e429289fcabf5585a9eb
[fs/lustre-release.git] / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include <libcfs/libcfs.h>
40
41 #ifndef __KERNEL__
42 # include <liblustre.h>
43 #endif
44
45 #include <lustre_dlm.h>
46 #include <lustre_net.h>
47 #include <lustre/lustre_user.h>
48 #include <obd_cksum.h>
49 #include <obd_ost.h>
50 #include <obd_lov.h>
51
52 #ifdef  __CYGWIN__
53 # include <ctype.h>
54 #endif
55
56 #include <lustre_ha.h>
57 #include <lprocfs_status.h>
58 #include <lustre_log.h>
59 #include <lustre_debug.h>
60 #include <lustre_param.h>
61 #include <lustre_fid.h>
62 #include "osc_internal.h"
63 #include "osc_cl_internal.h"
64
65 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
66 static int brw_interpret(const struct lu_env *env,
67                          struct ptlrpc_request *req, void *data, int rc);
68 int osc_cleanup(struct obd_device *obd);
69
70 /* Pack OSC object metadata for disk storage (LE byte order). */
71 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
72                       struct lov_stripe_md *lsm)
73 {
74         int lmm_size;
75         ENTRY;
76
77         lmm_size = sizeof(**lmmp);
78         if (!lmmp)
79                 RETURN(lmm_size);
80
81         if (*lmmp && !lsm) {
82                 OBD_FREE(*lmmp, lmm_size);
83                 *lmmp = NULL;
84                 RETURN(0);
85         }
86
87         if (!*lmmp) {
88                 OBD_ALLOC(*lmmp, lmm_size);
89                 if (!*lmmp)
90                         RETURN(-ENOMEM);
91         }
92
93         if (lsm) {
94                 LASSERT(lsm->lsm_object_id);
95                 LASSERT_SEQ_IS_MDT(lsm->lsm_object_seq);
96                 (*lmmp)->lmm_object_id = cpu_to_le64(lsm->lsm_object_id);
97                 (*lmmp)->lmm_object_seq = cpu_to_le64(lsm->lsm_object_seq);
98         }
99
100         RETURN(lmm_size);
101 }
102
103 /* Unpack OSC object metadata from disk storage (LE byte order). */
104 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
105                         struct lov_mds_md *lmm, int lmm_bytes)
106 {
107         int lsm_size;
108         struct obd_import *imp = class_exp2cliimp(exp);
109         ENTRY;
110
111         if (lmm != NULL) {
112                 if (lmm_bytes < sizeof (*lmm)) {
113                         CERROR("lov_mds_md too small: %d, need %d\n",
114                                lmm_bytes, (int)sizeof(*lmm));
115                         RETURN(-EINVAL);
116                 }
117                 /* XXX LOV_MAGIC etc check? */
118
119                 if (lmm->lmm_object_id == 0) {
120                         CERROR("lov_mds_md: zero lmm_object_id\n");
121                         RETURN(-EINVAL);
122                 }
123         }
124
125         lsm_size = lov_stripe_md_size(1);
126         if (lsmp == NULL)
127                 RETURN(lsm_size);
128
129         if (*lsmp != NULL && lmm == NULL) {
130                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131                 OBD_FREE(*lsmp, lsm_size);
132                 *lsmp = NULL;
133                 RETURN(0);
134         }
135
136         if (*lsmp == NULL) {
137                 OBD_ALLOC(*lsmp, lsm_size);
138                 if (*lsmp == NULL)
139                         RETURN(-ENOMEM);
140                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
141                 if ((*lsmp)->lsm_oinfo[0] == NULL) {
142                         OBD_FREE(*lsmp, lsm_size);
143                         RETURN(-ENOMEM);
144                 }
145                 loi_init((*lsmp)->lsm_oinfo[0]);
146         }
147
148         if (lmm != NULL) {
149                 /* XXX zero *lsmp? */
150                 (*lsmp)->lsm_object_id = le64_to_cpu (lmm->lmm_object_id);
151                 (*lsmp)->lsm_object_seq = le64_to_cpu (lmm->lmm_object_seq);
152                 LASSERT((*lsmp)->lsm_object_id);
153                 LASSERT_SEQ_IS_MDT((*lsmp)->lsm_object_seq);
154         }
155
156         if (imp != NULL &&
157             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
158                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
159         else
160                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
161
162         RETURN(lsm_size);
163 }
164
165 static inline void osc_pack_capa(struct ptlrpc_request *req,
166                                  struct ost_body *body, void *capa)
167 {
168         struct obd_capa *oc = (struct obd_capa *)capa;
169         struct lustre_capa *c;
170
171         if (!capa)
172                 return;
173
174         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
175         LASSERT(c);
176         capa_cpy(c, oc);
177         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
178         DEBUG_CAPA(D_SEC, c, "pack");
179 }
180
181 static inline void osc_pack_req_body(struct ptlrpc_request *req,
182                                      struct obd_info *oinfo)
183 {
184         struct ost_body *body;
185
186         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
187         LASSERT(body);
188
189         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
190         osc_pack_capa(req, body, oinfo->oi_capa);
191 }
192
193 static inline void osc_set_capa_size(struct ptlrpc_request *req,
194                                      const struct req_msg_field *field,
195                                      struct obd_capa *oc)
196 {
197         if (oc == NULL)
198                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
199         else
200                 /* it is already calculated as sizeof struct obd_capa */
201                 ;
202 }
203
204 static int osc_getattr_interpret(const struct lu_env *env,
205                                  struct ptlrpc_request *req,
206                                  struct osc_async_args *aa, int rc)
207 {
208         struct ost_body *body;
209         ENTRY;
210
211         if (rc != 0)
212                 GOTO(out, rc);
213
214         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
215         if (body) {
216                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
217                 lustre_get_wire_obdo(aa->aa_oi->oi_oa, &body->oa);
218
219                 /* This should really be sent by the OST */
220                 aa->aa_oi->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
221                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
222         } else {
223                 CDEBUG(D_INFO, "can't unpack ost_body\n");
224                 rc = -EPROTO;
225                 aa->aa_oi->oi_oa->o_valid = 0;
226         }
227 out:
228         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
229         RETURN(rc);
230 }
231
232 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
233                              struct ptlrpc_request_set *set)
234 {
235         struct ptlrpc_request *req;
236         struct osc_async_args *aa;
237         int                    rc;
238         ENTRY;
239
240         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
241         if (req == NULL)
242                 RETURN(-ENOMEM);
243
244         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
245         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
246         if (rc) {
247                 ptlrpc_request_free(req);
248                 RETURN(rc);
249         }
250
251         osc_pack_req_body(req, oinfo);
252
253         ptlrpc_request_set_replen(req);
254         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
255
256         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
257         aa = ptlrpc_req_async_args(req);
258         aa->aa_oi = oinfo;
259
260         ptlrpc_set_add_req(set, req);
261         RETURN(0);
262 }
263
264 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
265                        struct obd_info *oinfo)
266 {
267         struct ptlrpc_request *req;
268         struct ost_body       *body;
269         int                    rc;
270         ENTRY;
271
272         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
273         if (req == NULL)
274                 RETURN(-ENOMEM);
275
276         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
277         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
278         if (rc) {
279                 ptlrpc_request_free(req);
280                 RETURN(rc);
281         }
282
283         osc_pack_req_body(req, oinfo);
284
285         ptlrpc_request_set_replen(req);
286
287         rc = ptlrpc_queue_wait(req);
288         if (rc)
289                 GOTO(out, rc);
290
291         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
292         if (body == NULL)
293                 GOTO(out, rc = -EPROTO);
294
295         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
296         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
297
298         /* This should really be sent by the OST */
299         oinfo->oi_oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
300         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
301
302         EXIT;
303  out:
304         ptlrpc_req_finished(req);
305         return rc;
306 }
307
308 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
309                        struct obd_info *oinfo, struct obd_trans_info *oti)
310 {
311         struct ptlrpc_request *req;
312         struct ost_body       *body;
313         int                    rc;
314         ENTRY;
315
316         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
317
318         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
319         if (req == NULL)
320                 RETURN(-ENOMEM);
321
322         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
323         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
324         if (rc) {
325                 ptlrpc_request_free(req);
326                 RETURN(rc);
327         }
328
329         osc_pack_req_body(req, oinfo);
330
331         ptlrpc_request_set_replen(req);
332
333         rc = ptlrpc_queue_wait(req);
334         if (rc)
335                 GOTO(out, rc);
336
337         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
338         if (body == NULL)
339                 GOTO(out, rc = -EPROTO);
340
341         lustre_get_wire_obdo(oinfo->oi_oa, &body->oa);
342
343         EXIT;
344 out:
345         ptlrpc_req_finished(req);
346         RETURN(rc);
347 }
348
349 static int osc_setattr_interpret(const struct lu_env *env,
350                                  struct ptlrpc_request *req,
351                                  struct osc_setattr_args *sa, int rc)
352 {
353         struct ost_body *body;
354         ENTRY;
355
356         if (rc != 0)
357                 GOTO(out, rc);
358
359         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
360         if (body == NULL)
361                 GOTO(out, rc = -EPROTO);
362
363         lustre_get_wire_obdo(sa->sa_oa, &body->oa);
364 out:
365         rc = sa->sa_upcall(sa->sa_cookie, rc);
366         RETURN(rc);
367 }
368
369 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
370                            struct obd_trans_info *oti,
371                            obd_enqueue_update_f upcall, void *cookie,
372                            struct ptlrpc_request_set *rqset)
373 {
374         struct ptlrpc_request   *req;
375         struct osc_setattr_args *sa;
376         int                      rc;
377         ENTRY;
378
379         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
380         if (req == NULL)
381                 RETURN(-ENOMEM);
382
383         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
384         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
385         if (rc) {
386                 ptlrpc_request_free(req);
387                 RETURN(rc);
388         }
389
390         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
391                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
392
393         osc_pack_req_body(req, oinfo);
394
395         ptlrpc_request_set_replen(req);
396
397         /* do mds to ost setattr asynchronously */
398         if (!rqset) {
399                 /* Do not wait for response. */
400                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
401         } else {
402                 req->rq_interpret_reply =
403                         (ptlrpc_interpterer_t)osc_setattr_interpret;
404
405                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
406                 sa = ptlrpc_req_async_args(req);
407                 sa->sa_oa = oinfo->oi_oa;
408                 sa->sa_upcall = upcall;
409                 sa->sa_cookie = cookie;
410
411                 if (rqset == PTLRPCD_SET)
412                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
413                 else
414                         ptlrpc_set_add_req(rqset, req);
415         }
416
417         RETURN(0);
418 }
419
420 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
421                              struct obd_trans_info *oti,
422                              struct ptlrpc_request_set *rqset)
423 {
424         return osc_setattr_async_base(exp, oinfo, oti,
425                                       oinfo->oi_cb_up, oinfo, rqset);
426 }
427
428 int osc_real_create(struct obd_export *exp, struct obdo *oa,
429                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
430 {
431         struct ptlrpc_request *req;
432         struct ost_body       *body;
433         struct lov_stripe_md  *lsm;
434         int                    rc;
435         ENTRY;
436
437         LASSERT(oa);
438         LASSERT(ea);
439
440         lsm = *ea;
441         if (!lsm) {
442                 rc = obd_alloc_memmd(exp, &lsm);
443                 if (rc < 0)
444                         RETURN(rc);
445         }
446
447         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
448         if (req == NULL)
449                 GOTO(out, rc = -ENOMEM);
450
451         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
452         if (rc) {
453                 ptlrpc_request_free(req);
454                 GOTO(out, rc);
455         }
456
457         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
458         LASSERT(body);
459         lustre_set_wire_obdo(&body->oa, oa);
460
461         ptlrpc_request_set_replen(req);
462
463         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
464             oa->o_flags == OBD_FL_DELORPHAN) {
465                 DEBUG_REQ(D_HA, req,
466                           "delorphan from OST integration");
467                 /* Don't resend the delorphan req */
468                 req->rq_no_resend = req->rq_no_delay = 1;
469         }
470
471         rc = ptlrpc_queue_wait(req);
472         if (rc)
473                 GOTO(out_req, rc);
474
475         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
476         if (body == NULL)
477                 GOTO(out_req, rc = -EPROTO);
478
479         lustre_get_wire_obdo(oa, &body->oa);
480
481         /* This should really be sent by the OST */
482         oa->o_blksize = PTLRPC_MAX_BRW_SIZE;
483         oa->o_valid |= OBD_MD_FLBLKSZ;
484
485         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
486          * have valid lsm_oinfo data structs, so don't go touching that.
487          * This needs to be fixed in a big way.
488          */
489         lsm->lsm_object_id = oa->o_id;
490         lsm->lsm_object_seq = oa->o_seq;
491         *ea = lsm;
492
493         if (oti != NULL) {
494                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
495
496                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
497                         if (!oti->oti_logcookies)
498                                 oti_alloc_cookies(oti, 1);
499                         *oti->oti_logcookies = oa->o_lcookie;
500                 }
501         }
502
503         CDEBUG(D_HA, "transno: "LPD64"\n",
504                lustre_msg_get_transno(req->rq_repmsg));
505 out_req:
506         ptlrpc_req_finished(req);
507 out:
508         if (rc && !*ea)
509                 obd_free_memmd(exp, &lsm);
510         RETURN(rc);
511 }
512
513 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
514                    obd_enqueue_update_f upcall, void *cookie,
515                    struct ptlrpc_request_set *rqset)
516 {
517         struct ptlrpc_request   *req;
518         struct osc_setattr_args *sa;
519         struct ost_body         *body;
520         int                      rc;
521         ENTRY;
522
523         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
524         if (req == NULL)
525                 RETURN(-ENOMEM);
526
527         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
528         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
529         if (rc) {
530                 ptlrpc_request_free(req);
531                 RETURN(rc);
532         }
533         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
534         ptlrpc_at_set_req_timeout(req);
535
536         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
537         LASSERT(body);
538         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
539         osc_pack_capa(req, body, oinfo->oi_capa);
540
541         ptlrpc_request_set_replen(req);
542
543         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
544         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
545         sa = ptlrpc_req_async_args(req);
546         sa->sa_oa     = oinfo->oi_oa;
547         sa->sa_upcall = upcall;
548         sa->sa_cookie = cookie;
549         if (rqset == PTLRPCD_SET)
550                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
551         else
552                 ptlrpc_set_add_req(rqset, req);
553
554         RETURN(0);
555 }
556
557 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
558                      struct obd_info *oinfo, struct obd_trans_info *oti,
559                      struct ptlrpc_request_set *rqset)
560 {
561         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
562         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
563         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
564         return osc_punch_base(exp, oinfo,
565                               oinfo->oi_cb_up, oinfo, rqset);
566 }
567
568 static int osc_sync_interpret(const struct lu_env *env,
569                               struct ptlrpc_request *req,
570                               void *arg, int rc)
571 {
572         struct osc_fsync_args *fa = arg;
573         struct ost_body *body;
574         ENTRY;
575
576         if (rc)
577                 GOTO(out, rc);
578
579         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
580         if (body == NULL) {
581                 CERROR ("can't unpack ost_body\n");
582                 GOTO(out, rc = -EPROTO);
583         }
584
585         *fa->fa_oi->oi_oa = body->oa;
586 out:
587         rc = fa->fa_upcall(fa->fa_cookie, rc);
588         RETURN(rc);
589 }
590
591 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
592                   obd_enqueue_update_f upcall, void *cookie,
593                   struct ptlrpc_request_set *rqset)
594 {
595         struct ptlrpc_request *req;
596         struct ost_body       *body;
597         struct osc_fsync_args *fa;
598         int                    rc;
599         ENTRY;
600
601         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
602         if (req == NULL)
603                 RETURN(-ENOMEM);
604
605         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
606         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
607         if (rc) {
608                 ptlrpc_request_free(req);
609                 RETURN(rc);
610         }
611
612         /* overload the size and blocks fields in the oa with start/end */
613         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
614         LASSERT(body);
615         lustre_set_wire_obdo(&body->oa, oinfo->oi_oa);
616         osc_pack_capa(req, body, oinfo->oi_capa);
617
618         ptlrpc_request_set_replen(req);
619         req->rq_interpret_reply = osc_sync_interpret;
620
621         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
622         fa = ptlrpc_req_async_args(req);
623         fa->fa_oi = oinfo;
624         fa->fa_upcall = upcall;
625         fa->fa_cookie = cookie;
626
627         if (rqset == PTLRPCD_SET)
628                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
629         else
630                 ptlrpc_set_add_req(rqset, req);
631
632         RETURN (0);
633 }
634
635 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
636                     struct obd_info *oinfo, obd_size start, obd_size end,
637                     struct ptlrpc_request_set *set)
638 {
639         ENTRY;
640
641         if (!oinfo->oi_oa) {
642                 CDEBUG(D_INFO, "oa NULL\n");
643                 RETURN(-EINVAL);
644         }
645
646         oinfo->oi_oa->o_size = start;
647         oinfo->oi_oa->o_blocks = end;
648         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
649
650         RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
651 }
652
653 /* Find and cancel locally locks matched by @mode in the resource found by
654  * @objid. Found locks are added into @cancel list. Returns the amount of
655  * locks added to @cancels list. */
656 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
657                                    cfs_list_t *cancels,
658                                    ldlm_mode_t mode, int lock_flags)
659 {
660         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
661         struct ldlm_res_id res_id;
662         struct ldlm_resource *res;
663         int count;
664         ENTRY;
665
666         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
667          * export) but disabled through procfs (flag in NS).
668          *
669          * This distinguishes from a case when ELC is not supported originally,
670          * when we still want to cancel locks in advance and just cancel them
671          * locally, without sending any RPC. */
672         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
673                 RETURN(0);
674
675         ostid_build_res_name(&oa->o_oi, &res_id);
676         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
677         if (res == NULL)
678                 RETURN(0);
679
680         LDLM_RESOURCE_ADDREF(res);
681         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
682                                            lock_flags, 0, NULL);
683         LDLM_RESOURCE_DELREF(res);
684         ldlm_resource_putref(res);
685         RETURN(count);
686 }
687
688 static int osc_destroy_interpret(const struct lu_env *env,
689                                  struct ptlrpc_request *req, void *data,
690                                  int rc)
691 {
692         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
693
694         cfs_atomic_dec(&cli->cl_destroy_in_flight);
695         cfs_waitq_signal(&cli->cl_destroy_waitq);
696         return 0;
697 }
698
699 static int osc_can_send_destroy(struct client_obd *cli)
700 {
701         if (cfs_atomic_inc_return(&cli->cl_destroy_in_flight) <=
702             cli->cl_max_rpcs_in_flight) {
703                 /* The destroy request can be sent */
704                 return 1;
705         }
706         if (cfs_atomic_dec_return(&cli->cl_destroy_in_flight) <
707             cli->cl_max_rpcs_in_flight) {
708                 /*
709                  * The counter has been modified between the two atomic
710                  * operations.
711                  */
712                 cfs_waitq_signal(&cli->cl_destroy_waitq);
713         }
714         return 0;
715 }
716
717 int osc_create(const struct lu_env *env, struct obd_export *exp,
718                struct obdo *oa, struct lov_stripe_md **ea,
719                struct obd_trans_info *oti)
720 {
721         int rc = 0;
722         ENTRY;
723
724         LASSERT(oa);
725         LASSERT(ea);
726         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
727
728         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
729             oa->o_flags == OBD_FL_RECREATE_OBJS) {
730                 RETURN(osc_real_create(exp, oa, ea, oti));
731         }
732
733         if (!fid_seq_is_mdt(oa->o_seq))
734                 RETURN(osc_real_create(exp, oa, ea, oti));
735
736         /* we should not get here anymore */
737         LBUG();
738
739         RETURN(rc);
740 }
741
742 /* Destroy requests can be async always on the client, and we don't even really
743  * care about the return code since the client cannot do anything at all about
744  * a destroy failure.
745  * When the MDS is unlinking a filename, it saves the file objects into a
746  * recovery llog, and these object records are cancelled when the OST reports
747  * they were destroyed and sync'd to disk (i.e. transaction committed).
748  * If the client dies, or the OST is down when the object should be destroyed,
749  * the records are not cancelled, and when the OST reconnects to the MDS next,
750  * it will retrieve the llog unlink logs and then sends the log cancellation
751  * cookies to the MDS after committing destroy transactions. */
752 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
753                        struct obdo *oa, struct lov_stripe_md *ea,
754                        struct obd_trans_info *oti, struct obd_export *md_export,
755                        void *capa)
756 {
757         struct client_obd     *cli = &exp->exp_obd->u.cli;
758         struct ptlrpc_request *req;
759         struct ost_body       *body;
760         CFS_LIST_HEAD(cancels);
761         int rc, count;
762         ENTRY;
763
764         if (!oa) {
765                 CDEBUG(D_INFO, "oa NULL\n");
766                 RETURN(-EINVAL);
767         }
768
769         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
770                                         LDLM_FL_DISCARD_DATA);
771
772         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
773         if (req == NULL) {
774                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
775                 RETURN(-ENOMEM);
776         }
777
778         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
779         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
780                                0, &cancels, count);
781         if (rc) {
782                 ptlrpc_request_free(req);
783                 RETURN(rc);
784         }
785
786         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
787         ptlrpc_at_set_req_timeout(req);
788
789         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
790                 oa->o_lcookie = *oti->oti_logcookies;
791         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
792         LASSERT(body);
793         lustre_set_wire_obdo(&body->oa, oa);
794
795         osc_pack_capa(req, body, (struct obd_capa *)capa);
796         ptlrpc_request_set_replen(req);
797
798         /* If osc_destory is for destroying the unlink orphan,
799          * sent from MDT to OST, which should not be blocked here,
800          * because the process might be triggered by ptlrpcd, and
801          * it is not good to block ptlrpcd thread (b=16006)*/
802         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
803                 req->rq_interpret_reply = osc_destroy_interpret;
804                 if (!osc_can_send_destroy(cli)) {
805                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
806                                                           NULL);
807
808                         /*
809                          * Wait until the number of on-going destroy RPCs drops
810                          * under max_rpc_in_flight
811                          */
812                         l_wait_event_exclusive(cli->cl_destroy_waitq,
813                                                osc_can_send_destroy(cli), &lwi);
814                 }
815         }
816
817         /* Do not wait for response */
818         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
819         RETURN(0);
820 }
821
822 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
823                                 long writing_bytes)
824 {
825         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
826
827         LASSERT(!(oa->o_valid & bits));
828
829         oa->o_valid |= bits;
830         client_obd_list_lock(&cli->cl_loi_list_lock);
831         oa->o_dirty = cli->cl_dirty;
832         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
833                      cli->cl_dirty_max)) {
834                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
835                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
836                 oa->o_undirty = 0;
837         } else if (unlikely(cfs_atomic_read(&obd_dirty_pages) -
838                             cfs_atomic_read(&obd_dirty_transit_pages) >
839                             (long)(obd_max_dirty_pages + 1))) {
840                 /* The cfs_atomic_read() allowing the cfs_atomic_inc() are
841                  * not covered by a lock thus they may safely race and trip
842                  * this CERROR() unless we add in a small fudge factor (+1). */
843                 CERROR("dirty %d - %d > system dirty_max %d\n",
844                        cfs_atomic_read(&obd_dirty_pages),
845                        cfs_atomic_read(&obd_dirty_transit_pages),
846                        obd_max_dirty_pages);
847                 oa->o_undirty = 0;
848         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
849                 CERROR("dirty %lu - dirty_max %lu too big???\n",
850                        cli->cl_dirty, cli->cl_dirty_max);
851                 oa->o_undirty = 0;
852         } else {
853                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
854                                       CFS_PAGE_SHIFT)*
855                                      (cli->cl_max_rpcs_in_flight + 1);
856                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
857         }
858         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
859         oa->o_dropped = cli->cl_lost_grant;
860         cli->cl_lost_grant = 0;
861         client_obd_list_unlock(&cli->cl_loi_list_lock);
862         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
863                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
864
865 }
866
867 void osc_update_next_shrink(struct client_obd *cli)
868 {
869         cli->cl_next_shrink_grant =
870                 cfs_time_shift(cli->cl_grant_shrink_interval);
871         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
872                cli->cl_next_shrink_grant);
873 }
874
875 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
876 {
877         client_obd_list_lock(&cli->cl_loi_list_lock);
878         cli->cl_avail_grant += grant;
879         client_obd_list_unlock(&cli->cl_loi_list_lock);
880 }
881
882 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
883 {
884         if (body->oa.o_valid & OBD_MD_FLGRANT) {
885                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
886                 __osc_update_grant(cli, body->oa.o_grant);
887         }
888 }
889
890 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
891                               obd_count keylen, void *key, obd_count vallen,
892                               void *val, struct ptlrpc_request_set *set);
893
894 static int osc_shrink_grant_interpret(const struct lu_env *env,
895                                       struct ptlrpc_request *req,
896                                       void *aa, int rc)
897 {
898         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
899         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
900         struct ost_body *body;
901
902         if (rc != 0) {
903                 __osc_update_grant(cli, oa->o_grant);
904                 GOTO(out, rc);
905         }
906
907         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
908         LASSERT(body);
909         osc_update_grant(cli, body);
910 out:
911         OBDO_FREE(oa);
912         return rc;
913 }
914
915 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
916 {
917         client_obd_list_lock(&cli->cl_loi_list_lock);
918         oa->o_grant = cli->cl_avail_grant / 4;
919         cli->cl_avail_grant -= oa->o_grant;
920         client_obd_list_unlock(&cli->cl_loi_list_lock);
921         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
922                 oa->o_valid |= OBD_MD_FLFLAGS;
923                 oa->o_flags = 0;
924         }
925         oa->o_flags |= OBD_FL_SHRINK_GRANT;
926         osc_update_next_shrink(cli);
927 }
928
929 /* Shrink the current grant, either from some large amount to enough for a
930  * full set of in-flight RPCs, or if we have already shrunk to that limit
931  * then to enough for a single RPC.  This avoids keeping more grant than
932  * needed, and avoids shrinking the grant piecemeal. */
933 static int osc_shrink_grant(struct client_obd *cli)
934 {
935         long target = (cli->cl_max_rpcs_in_flight + 1) *
936                       cli->cl_max_pages_per_rpc;
937
938         client_obd_list_lock(&cli->cl_loi_list_lock);
939         if (cli->cl_avail_grant <= target)
940                 target = cli->cl_max_pages_per_rpc;
941         client_obd_list_unlock(&cli->cl_loi_list_lock);
942
943         return osc_shrink_grant_to_target(cli, target);
944 }
945
946 int osc_shrink_grant_to_target(struct client_obd *cli, long target)
947 {
948         int    rc = 0;
949         struct ost_body     *body;
950         ENTRY;
951
952         client_obd_list_lock(&cli->cl_loi_list_lock);
953         /* Don't shrink if we are already above or below the desired limit
954          * We don't want to shrink below a single RPC, as that will negatively
955          * impact block allocation and long-term performance. */
956         if (target < cli->cl_max_pages_per_rpc)
957                 target = cli->cl_max_pages_per_rpc;
958
959         if (target >= cli->cl_avail_grant) {
960                 client_obd_list_unlock(&cli->cl_loi_list_lock);
961                 RETURN(0);
962         }
963         client_obd_list_unlock(&cli->cl_loi_list_lock);
964
965         OBD_ALLOC_PTR(body);
966         if (!body)
967                 RETURN(-ENOMEM);
968
969         osc_announce_cached(cli, &body->oa, 0);
970
971         client_obd_list_lock(&cli->cl_loi_list_lock);
972         body->oa.o_grant = cli->cl_avail_grant - target;
973         cli->cl_avail_grant = target;
974         client_obd_list_unlock(&cli->cl_loi_list_lock);
975         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
976                 body->oa.o_valid |= OBD_MD_FLFLAGS;
977                 body->oa.o_flags = 0;
978         }
979         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
980         osc_update_next_shrink(cli);
981
982         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
983                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
984                                 sizeof(*body), body, NULL);
985         if (rc != 0)
986                 __osc_update_grant(cli, body->oa.o_grant);
987         OBD_FREE_PTR(body);
988         RETURN(rc);
989 }
990
991 #define GRANT_SHRINK_LIMIT PTLRPC_MAX_BRW_SIZE
992 static int osc_should_shrink_grant(struct client_obd *client)
993 {
994         cfs_time_t time = cfs_time_current();
995         cfs_time_t next_shrink = client->cl_next_shrink_grant;
996
997         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
998              OBD_CONNECT_GRANT_SHRINK) == 0)
999                 return 0;
1000
1001         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1002                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1003                     client->cl_avail_grant > GRANT_SHRINK_LIMIT)
1004                         return 1;
1005                 else
1006                         osc_update_next_shrink(client);
1007         }
1008         return 0;
1009 }
1010
1011 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1012 {
1013         struct client_obd *client;
1014
1015         cfs_list_for_each_entry(client, &item->ti_obd_list,
1016                                 cl_grant_shrink_list) {
1017                 if (osc_should_shrink_grant(client))
1018                         osc_shrink_grant(client);
1019         }
1020         return 0;
1021 }
1022
1023 static int osc_add_shrink_grant(struct client_obd *client)
1024 {
1025         int rc;
1026
1027         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1028                                        TIMEOUT_GRANT,
1029                                        osc_grant_shrink_grant_cb, NULL,
1030                                        &client->cl_grant_shrink_list);
1031         if (rc) {
1032                 CERROR("add grant client %s error %d\n",
1033                         client->cl_import->imp_obd->obd_name, rc);
1034                 return rc;
1035         }
1036         CDEBUG(D_CACHE, "add grant client %s \n",
1037                client->cl_import->imp_obd->obd_name);
1038         osc_update_next_shrink(client);
1039         return 0;
1040 }
1041
1042 static int osc_del_shrink_grant(struct client_obd *client)
1043 {
1044         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1045                                          TIMEOUT_GRANT);
1046 }
1047
1048 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1049 {
1050         /*
1051          * ocd_grant is the total grant amount we're expect to hold: if we've
1052          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1053          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1054          *
1055          * race is tolerable here: if we're evicted, but imp_state already
1056          * left EVICTED state, then cl_dirty must be 0 already.
1057          */
1058         client_obd_list_lock(&cli->cl_loi_list_lock);
1059         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1060                 cli->cl_avail_grant = ocd->ocd_grant;
1061         else
1062                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1063
1064         if (cli->cl_avail_grant < 0) {
1065                 CWARN("%s: available grant < 0, the OSS is probably not running"
1066                       " with patch from bug20278 (%ld) \n",
1067                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant);
1068                 /* workaround for 1.6 servers which do not have
1069                  * the patch from bug20278 */
1070                 cli->cl_avail_grant = ocd->ocd_grant;
1071         }
1072
1073         /* determine the appropriate chunk size used by osc_extent. */
1074         cli->cl_chunkbits = max_t(int, CFS_PAGE_SHIFT, ocd->ocd_blocksize);
1075         client_obd_list_unlock(&cli->cl_loi_list_lock);
1076
1077         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1078                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1079                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1080
1081         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1082             cfs_list_empty(&cli->cl_grant_shrink_list))
1083                 osc_add_shrink_grant(cli);
1084 }
1085
1086 /* We assume that the reason this OSC got a short read is because it read
1087  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1088  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1089  * this stripe never got written at or beyond this stripe offset yet. */
1090 static void handle_short_read(int nob_read, obd_count page_count,
1091                               struct brw_page **pga)
1092 {
1093         char *ptr;
1094         int i = 0;
1095
1096         /* skip bytes read OK */
1097         while (nob_read > 0) {
1098                 LASSERT (page_count > 0);
1099
1100                 if (pga[i]->count > nob_read) {
1101                         /* EOF inside this page */
1102                         ptr = cfs_kmap(pga[i]->pg) +
1103                                 (pga[i]->off & ~CFS_PAGE_MASK);
1104                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1105                         cfs_kunmap(pga[i]->pg);
1106                         page_count--;
1107                         i++;
1108                         break;
1109                 }
1110
1111                 nob_read -= pga[i]->count;
1112                 page_count--;
1113                 i++;
1114         }
1115
1116         /* zero remaining pages */
1117         while (page_count-- > 0) {
1118                 ptr = cfs_kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1119                 memset(ptr, 0, pga[i]->count);
1120                 cfs_kunmap(pga[i]->pg);
1121                 i++;
1122         }
1123 }
1124
1125 static int check_write_rcs(struct ptlrpc_request *req,
1126                            int requested_nob, int niocount,
1127                            obd_count page_count, struct brw_page **pga)
1128 {
1129         int     i;
1130         __u32   *remote_rcs;
1131
1132         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1133                                                   sizeof(*remote_rcs) *
1134                                                   niocount);
1135         if (remote_rcs == NULL) {
1136                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1137                 return(-EPROTO);
1138         }
1139
1140         /* return error if any niobuf was in error */
1141         for (i = 0; i < niocount; i++) {
1142                 if ((int)remote_rcs[i] < 0)
1143                         return(remote_rcs[i]);
1144
1145                 if (remote_rcs[i] != 0) {
1146                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1147                                 i, remote_rcs[i], req);
1148                         return(-EPROTO);
1149                 }
1150         }
1151
1152         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1153                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1154                        req->rq_bulk->bd_nob_transferred, requested_nob);
1155                 return(-EPROTO);
1156         }
1157
1158         return (0);
1159 }
1160
1161 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1162 {
1163         if (p1->flag != p2->flag) {
1164                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1165                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1166
1167                 /* warn if we try to combine flags that we don't know to be
1168                  * safe to combine */
1169                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1170                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1171                               "report this at http://bugs.whamcloud.com/\n",
1172                               p1->flag, p2->flag);
1173                 }
1174                 return 0;
1175         }
1176
1177         return (p1->off + p1->count == p2->off);
1178 }
1179
1180 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1181                                    struct brw_page **pga, int opc,
1182                                    cksum_type_t cksum_type)
1183 {
1184         __u32                           cksum;
1185         int                             i = 0;
1186         struct cfs_crypto_hash_desc     *hdesc;
1187         unsigned int                    bufsize;
1188         int                             err;
1189         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1190
1191         LASSERT(pg_count > 0);
1192
1193         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1194         if (IS_ERR(hdesc)) {
1195                 CERROR("Unable to initialize checksum hash %s\n",
1196                        cfs_crypto_hash_name(cfs_alg));
1197                 return PTR_ERR(hdesc);
1198         }
1199
1200         while (nob > 0 && pg_count > 0) {
1201                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1202
1203                 /* corrupt the data before we compute the checksum, to
1204                  * simulate an OST->client data error */
1205                 if (i == 0 && opc == OST_READ &&
1206                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1207                         unsigned char *ptr = cfs_kmap(pga[i]->pg);
1208                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1209                         memcpy(ptr + off, "bad1", min(4, nob));
1210                         cfs_kunmap(pga[i]->pg);
1211                 }
1212                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1213                                   pga[i]->off & ~CFS_PAGE_MASK,
1214                                   count);
1215                 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d checksum %x\n",
1216                                (int)(pga[i]->off & ~CFS_PAGE_MASK), cksum);
1217
1218                 nob -= pga[i]->count;
1219                 pg_count--;
1220                 i++;
1221         }
1222
1223         bufsize = 4;
1224         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1225
1226         if (err)
1227                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1228
1229         /* For sending we only compute the wrong checksum instead
1230          * of corrupting the data so it is still correct on a redo */
1231         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1232                 cksum++;
1233
1234         return cksum;
1235 }
1236
1237 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1238                                 struct lov_stripe_md *lsm, obd_count page_count,
1239                                 struct brw_page **pga,
1240                                 struct ptlrpc_request **reqp,
1241                                 struct obd_capa *ocapa, int reserve,
1242                                 int resend)
1243 {
1244         struct ptlrpc_request   *req;
1245         struct ptlrpc_bulk_desc *desc;
1246         struct ost_body         *body;
1247         struct obd_ioobj        *ioobj;
1248         struct niobuf_remote    *niobuf;
1249         int niocount, i, requested_nob, opc, rc;
1250         struct osc_brw_async_args *aa;
1251         struct req_capsule      *pill;
1252         struct brw_page *pg_prev;
1253
1254         ENTRY;
1255         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1256                 RETURN(-ENOMEM); /* Recoverable */
1257         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1258                 RETURN(-EINVAL); /* Fatal */
1259
1260         if ((cmd & OBD_BRW_WRITE) != 0) {
1261                 opc = OST_WRITE;
1262                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1263                                                 cli->cl_import->imp_rq_pool,
1264                                                 &RQF_OST_BRW_WRITE);
1265         } else {
1266                 opc = OST_READ;
1267                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1268         }
1269         if (req == NULL)
1270                 RETURN(-ENOMEM);
1271
1272         for (niocount = i = 1; i < page_count; i++) {
1273                 if (!can_merge_pages(pga[i - 1], pga[i]))
1274                         niocount++;
1275         }
1276
1277         pill = &req->rq_pill;
1278         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1279                              sizeof(*ioobj));
1280         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1281                              niocount * sizeof(*niobuf));
1282         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1283
1284         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1285         if (rc) {
1286                 ptlrpc_request_free(req);
1287                 RETURN(rc);
1288         }
1289         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1290         ptlrpc_at_set_req_timeout(req);
1291         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1292          * retry logic */
1293         req->rq_no_retry_einprogress = 1;
1294
1295         if (opc == OST_WRITE)
1296                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1297                                             BULK_GET_SOURCE, OST_BULK_PORTAL);
1298         else
1299                 desc = ptlrpc_prep_bulk_imp(req, page_count,
1300                                             BULK_PUT_SINK, OST_BULK_PORTAL);
1301
1302         if (desc == NULL)
1303                 GOTO(out, rc = -ENOMEM);
1304         /* NB request now owns desc and will free it when it gets freed */
1305
1306         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1307         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1308         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1309         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1310
1311         lustre_set_wire_obdo(&body->oa, oa);
1312
1313         obdo_to_ioobj(oa, ioobj);
1314         ioobj->ioo_bufcnt = niocount;
1315         osc_pack_capa(req, body, ocapa);
1316         LASSERT (page_count > 0);
1317         pg_prev = pga[0];
1318         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1319                 struct brw_page *pg = pga[i];
1320                 int poff = pg->off & ~CFS_PAGE_MASK;
1321
1322                 LASSERT(pg->count > 0);
1323                 /* make sure there is no gap in the middle of page array */
1324                 LASSERTF(page_count == 1 ||
1325                          (ergo(i == 0, poff + pg->count == CFS_PAGE_SIZE) &&
1326                           ergo(i > 0 && i < page_count - 1,
1327                                poff == 0 && pg->count == CFS_PAGE_SIZE)   &&
1328                           ergo(i == page_count - 1, poff == 0)),
1329                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1330                          i, page_count, pg, pg->off, pg->count);
1331 #ifdef __linux__
1332                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1333                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1334                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1335                          i, page_count,
1336                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1337                          pg_prev->pg, page_private(pg_prev->pg),
1338                          pg_prev->pg->index, pg_prev->off);
1339 #else
1340                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1341                          "i %d p_c %u\n", i, page_count);
1342 #endif
1343                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1344                         (pg->flag & OBD_BRW_SRVLOCK));
1345
1346                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1347                 requested_nob += pg->count;
1348
1349                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1350                         niobuf--;
1351                         niobuf->len += pg->count;
1352                 } else {
1353                         niobuf->offset = pg->off;
1354                         niobuf->len    = pg->count;
1355                         niobuf->flags  = pg->flag;
1356                 }
1357                 pg_prev = pg;
1358         }
1359
1360         LASSERTF((void *)(niobuf - niocount) ==
1361                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1362                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1363                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1364
1365         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1366         if (resend) {
1367                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1368                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1369                         body->oa.o_flags = 0;
1370                 }
1371                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1372         }
1373
1374         if (osc_should_shrink_grant(cli))
1375                 osc_shrink_grant_local(cli, &body->oa);
1376
1377         /* size[REQ_REC_OFF] still sizeof (*body) */
1378         if (opc == OST_WRITE) {
1379                 if (cli->cl_checksum &&
1380                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1381                         /* store cl_cksum_type in a local variable since
1382                          * it can be changed via lprocfs */
1383                         cksum_type_t cksum_type = cli->cl_cksum_type;
1384
1385                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1386                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1387                                 body->oa.o_flags = 0;
1388                         }
1389                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1390                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1391                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1392                                                              page_count, pga,
1393                                                              OST_WRITE,
1394                                                              cksum_type);
1395                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1396                                body->oa.o_cksum);
1397                         /* save this in 'oa', too, for later checking */
1398                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1399                         oa->o_flags |= cksum_type_pack(cksum_type);
1400                 } else {
1401                         /* clear out the checksum flag, in case this is a
1402                          * resend but cl_checksum is no longer set. b=11238 */
1403                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1404                 }
1405                 oa->o_cksum = body->oa.o_cksum;
1406                 /* 1 RC per niobuf */
1407                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1408                                      sizeof(__u32) * niocount);
1409         } else {
1410                 if (cli->cl_checksum &&
1411                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1412                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1413                                 body->oa.o_flags = 0;
1414                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1415                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1416                 }
1417         }
1418         ptlrpc_request_set_replen(req);
1419
1420         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1421         aa = ptlrpc_req_async_args(req);
1422         aa->aa_oa = oa;
1423         aa->aa_requested_nob = requested_nob;
1424         aa->aa_nio_count = niocount;
1425         aa->aa_page_count = page_count;
1426         aa->aa_resends = 0;
1427         aa->aa_ppga = pga;
1428         aa->aa_cli = cli;
1429         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
1430         if (ocapa && reserve)
1431                 aa->aa_ocapa = capa_get(ocapa);
1432
1433         *reqp = req;
1434         RETURN(0);
1435
1436  out:
1437         ptlrpc_req_finished(req);
1438         RETURN(rc);
1439 }
1440
1441 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1442                                 __u32 client_cksum, __u32 server_cksum, int nob,
1443                                 obd_count page_count, struct brw_page **pga,
1444                                 cksum_type_t client_cksum_type)
1445 {
1446         __u32 new_cksum;
1447         char *msg;
1448         cksum_type_t cksum_type;
1449
1450         if (server_cksum == client_cksum) {
1451                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1452                 return 0;
1453         }
1454
1455         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1456                                        oa->o_flags : 0);
1457         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1458                                       cksum_type);
1459
1460         if (cksum_type != client_cksum_type)
1461                 msg = "the server did not use the checksum type specified in "
1462                       "the original request - likely a protocol problem";
1463         else if (new_cksum == server_cksum)
1464                 msg = "changed on the client after we checksummed it - "
1465                       "likely false positive due to mmap IO (bug 11742)";
1466         else if (new_cksum == client_cksum)
1467                 msg = "changed in transit before arrival at OST";
1468         else
1469                 msg = "changed in transit AND doesn't match the original - "
1470                       "likely false positive due to mmap IO (bug 11742)";
1471
1472         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1473                            " object "LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1474                            msg, libcfs_nid2str(peer->nid),
1475                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1476                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1477                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1478                            oa->o_id,
1479                            oa->o_valid & OBD_MD_FLGROUP ? oa->o_seq : (__u64)0,
1480                            pga[0]->off,
1481                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1482         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1483                "client csum now %x\n", client_cksum, client_cksum_type,
1484                server_cksum, cksum_type, new_cksum);
1485         return 1;
1486 }
1487
1488 /* Note rc enters this function as number of bytes transferred */
1489 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1490 {
1491         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1492         const lnet_process_id_t *peer =
1493                         &req->rq_import->imp_connection->c_peer;
1494         struct client_obd *cli = aa->aa_cli;
1495         struct ost_body *body;
1496         __u32 client_cksum = 0;
1497         ENTRY;
1498
1499         if (rc < 0 && rc != -EDQUOT) {
1500                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1501                 RETURN(rc);
1502         }
1503
1504         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1505         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1506         if (body == NULL) {
1507                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1508                 RETURN(-EPROTO);
1509         }
1510
1511         /* set/clear over quota flag for a uid/gid */
1512         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1513             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1514                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1515
1516                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1517                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1518                        body->oa.o_flags);
1519                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1520         }
1521
1522         osc_update_grant(cli, body);
1523
1524         if (rc < 0)
1525                 RETURN(rc);
1526
1527         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1528                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1529
1530         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1531                 if (rc > 0) {
1532                         CERROR("Unexpected +ve rc %d\n", rc);
1533                         RETURN(-EPROTO);
1534                 }
1535                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1536
1537                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1538                         RETURN(-EAGAIN);
1539
1540                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1541                     check_write_checksum(&body->oa, peer, client_cksum,
1542                                          body->oa.o_cksum, aa->aa_requested_nob,
1543                                          aa->aa_page_count, aa->aa_ppga,
1544                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1545                         RETURN(-EAGAIN);
1546
1547                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1548                                      aa->aa_page_count, aa->aa_ppga);
1549                 GOTO(out, rc);
1550         }
1551
1552         /* The rest of this function executes only for OST_READs */
1553
1554         /* if unwrap_bulk failed, return -EAGAIN to retry */
1555         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1556         if (rc < 0)
1557                 GOTO(out, rc = -EAGAIN);
1558
1559         if (rc > aa->aa_requested_nob) {
1560                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1561                        aa->aa_requested_nob);
1562                 RETURN(-EPROTO);
1563         }
1564
1565         if (rc != req->rq_bulk->bd_nob_transferred) {
1566                 CERROR ("Unexpected rc %d (%d transferred)\n",
1567                         rc, req->rq_bulk->bd_nob_transferred);
1568                 return (-EPROTO);
1569         }
1570
1571         if (rc < aa->aa_requested_nob)
1572                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1573
1574         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1575                 static int cksum_counter;
1576                 __u32      server_cksum = body->oa.o_cksum;
1577                 char      *via;
1578                 char      *router;
1579                 cksum_type_t cksum_type;
1580
1581                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1582                                                body->oa.o_flags : 0);
1583                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1584                                                  aa->aa_ppga, OST_READ,
1585                                                  cksum_type);
1586
1587                 if (peer->nid == req->rq_bulk->bd_sender) {
1588                         via = router = "";
1589                 } else {
1590                         via = " via ";
1591                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1592                 }
1593
1594                 if (server_cksum == ~0 && rc > 0) {
1595                         CERROR("Protocol error: server %s set the 'checksum' "
1596                                "bit, but didn't send a checksum.  Not fatal, "
1597                                "but please notify on http://bugs.whamcloud.com/\n",
1598                                libcfs_nid2str(peer->nid));
1599                 } else if (server_cksum != client_cksum) {
1600                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1601                                            "%s%s%s inode "DFID" object "
1602                                            LPU64"/"LPU64" extent "
1603                                            "["LPU64"-"LPU64"]\n",
1604                                            req->rq_import->imp_obd->obd_name,
1605                                            libcfs_nid2str(peer->nid),
1606                                            via, router,
1607                                            body->oa.o_valid & OBD_MD_FLFID ?
1608                                                 body->oa.o_parent_seq : (__u64)0,
1609                                            body->oa.o_valid & OBD_MD_FLFID ?
1610                                                 body->oa.o_parent_oid : 0,
1611                                            body->oa.o_valid & OBD_MD_FLFID ?
1612                                                 body->oa.o_parent_ver : 0,
1613                                            body->oa.o_id,
1614                                            body->oa.o_valid & OBD_MD_FLGROUP ?
1615                                                 body->oa.o_seq : (__u64)0,
1616                                            aa->aa_ppga[0]->off,
1617                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1618                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1619                                                                         1);
1620                         CERROR("client %x, server %x, cksum_type %x\n",
1621                                client_cksum, server_cksum, cksum_type);
1622                         cksum_counter = 0;
1623                         aa->aa_oa->o_cksum = client_cksum;
1624                         rc = -EAGAIN;
1625                 } else {
1626                         cksum_counter++;
1627                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1628                         rc = 0;
1629                 }
1630         } else if (unlikely(client_cksum)) {
1631                 static int cksum_missed;
1632
1633                 cksum_missed++;
1634                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1635                         CERROR("Checksum %u requested from %s but not sent\n",
1636                                cksum_missed, libcfs_nid2str(peer->nid));
1637         } else {
1638                 rc = 0;
1639         }
1640 out:
1641         if (rc >= 0)
1642                 lustre_get_wire_obdo(aa->aa_oa, &body->oa);
1643
1644         RETURN(rc);
1645 }
1646
1647 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1648                             struct lov_stripe_md *lsm,
1649                             obd_count page_count, struct brw_page **pga,
1650                             struct obd_capa *ocapa)
1651 {
1652         struct ptlrpc_request *req;
1653         int                    rc;
1654         cfs_waitq_t            waitq;
1655         int                    generation, resends = 0;
1656         struct l_wait_info     lwi;
1657
1658         ENTRY;
1659
1660         cfs_waitq_init(&waitq);
1661         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1662
1663 restart_bulk:
1664         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1665                                   page_count, pga, &req, ocapa, 0, resends);
1666         if (rc != 0)
1667                 return (rc);
1668
1669         if (resends) {
1670                 req->rq_generation_set = 1;
1671                 req->rq_import_generation = generation;
1672                 req->rq_sent = cfs_time_current_sec() + resends;
1673         }
1674
1675         rc = ptlrpc_queue_wait(req);
1676
1677         if (rc == -ETIMEDOUT && req->rq_resend) {
1678                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1679                 ptlrpc_req_finished(req);
1680                 goto restart_bulk;
1681         }
1682
1683         rc = osc_brw_fini_request(req, rc);
1684
1685         ptlrpc_req_finished(req);
1686         /* When server return -EINPROGRESS, client should always retry
1687          * regardless of the number of times the bulk was resent already.*/
1688         if (osc_recoverable_error(rc)) {
1689                 resends++;
1690                 if (rc != -EINPROGRESS &&
1691                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1692                         CERROR("%s: too many resend retries for object: "
1693                                ""LPU64":"LPU64", rc = %d.\n",
1694                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1695                         goto out;
1696                 }
1697                 if (generation !=
1698                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1699                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1700                                ""LPU64":"LPU64", rc = %d.\n",
1701                                exp->exp_obd->obd_name, oa->o_id, oa->o_seq, rc);
1702                         goto out;
1703                 }
1704
1705                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1706                                        NULL);
1707                 l_wait_event(waitq, 0, &lwi);
1708
1709                 goto restart_bulk;
1710         }
1711 out:
1712         if (rc == -EAGAIN || rc == -EINPROGRESS)
1713                 rc = -EIO;
1714         RETURN (rc);
1715 }
1716
1717 static int osc_brw_redo_request(struct ptlrpc_request *request,
1718                                 struct osc_brw_async_args *aa, int rc)
1719 {
1720         struct ptlrpc_request *new_req;
1721         struct osc_brw_async_args *new_aa;
1722         struct osc_async_page *oap;
1723         ENTRY;
1724
1725         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1726                   "redo for recoverable error %d", rc);
1727
1728         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1729                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1730                                   aa->aa_cli, aa->aa_oa,
1731                                   NULL /* lsm unused by osc currently */,
1732                                   aa->aa_page_count, aa->aa_ppga,
1733                                   &new_req, aa->aa_ocapa, 0, 1);
1734         if (rc)
1735                 RETURN(rc);
1736
1737         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1738                 if (oap->oap_request != NULL) {
1739                         LASSERTF(request == oap->oap_request,
1740                                  "request %p != oap_request %p\n",
1741                                  request, oap->oap_request);
1742                         if (oap->oap_interrupted) {
1743                                 ptlrpc_req_finished(new_req);
1744                                 RETURN(-EINTR);
1745                         }
1746                 }
1747         }
1748         /* New request takes over pga and oaps from old request.
1749          * Note that copying a list_head doesn't work, need to move it... */
1750         aa->aa_resends++;
1751         new_req->rq_interpret_reply = request->rq_interpret_reply;
1752         new_req->rq_async_args = request->rq_async_args;
1753         /* cap resend delay to the current request timeout, this is similar to
1754          * what ptlrpc does (see after_reply()) */
1755         if (aa->aa_resends > new_req->rq_timeout)
1756                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1757         else
1758                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1759         new_req->rq_generation_set = 1;
1760         new_req->rq_import_generation = request->rq_import_generation;
1761
1762         new_aa = ptlrpc_req_async_args(new_req);
1763
1764         CFS_INIT_LIST_HEAD(&new_aa->aa_oaps);
1765         cfs_list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1766         CFS_INIT_LIST_HEAD(&new_aa->aa_exts);
1767         cfs_list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1768         new_aa->aa_resends = aa->aa_resends;
1769
1770         cfs_list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1771                 if (oap->oap_request) {
1772                         ptlrpc_req_finished(oap->oap_request);
1773                         oap->oap_request = ptlrpc_request_addref(new_req);
1774                 }
1775         }
1776
1777         new_aa->aa_ocapa = aa->aa_ocapa;
1778         aa->aa_ocapa = NULL;
1779
1780         /* XXX: This code will run into problem if we're going to support
1781          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1782          * and wait for all of them to be finished. We should inherit request
1783          * set from old request. */
1784         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1785
1786         DEBUG_REQ(D_INFO, new_req, "new request");
1787         RETURN(0);
1788 }
1789
1790 /*
1791  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1792  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1793  * fine for our small page arrays and doesn't require allocation.  its an
1794  * insertion sort that swaps elements that are strides apart, shrinking the
1795  * stride down until its '1' and the array is sorted.
1796  */
1797 static void sort_brw_pages(struct brw_page **array, int num)
1798 {
1799         int stride, i, j;
1800         struct brw_page *tmp;
1801
1802         if (num == 1)
1803                 return;
1804         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1805                 ;
1806
1807         do {
1808                 stride /= 3;
1809                 for (i = stride ; i < num ; i++) {
1810                         tmp = array[i];
1811                         j = i;
1812                         while (j >= stride && array[j - stride]->off > tmp->off) {
1813                                 array[j] = array[j - stride];
1814                                 j -= stride;
1815                         }
1816                         array[j] = tmp;
1817                 }
1818         } while (stride > 1);
1819 }
1820
1821 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1822 {
1823         int count = 1;
1824         int offset;
1825         int i = 0;
1826
1827         LASSERT (pages > 0);
1828         offset = pg[i]->off & ~CFS_PAGE_MASK;
1829
1830         for (;;) {
1831                 pages--;
1832                 if (pages == 0)         /* that's all */
1833                         return count;
1834
1835                 if (offset + pg[i]->count < CFS_PAGE_SIZE)
1836                         return count;   /* doesn't end on page boundary */
1837
1838                 i++;
1839                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1840                 if (offset != 0)        /* doesn't start on page boundary */
1841                         return count;
1842
1843                 count++;
1844         }
1845 }
1846
1847 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1848 {
1849         struct brw_page **ppga;
1850         int i;
1851
1852         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1853         if (ppga == NULL)
1854                 return NULL;
1855
1856         for (i = 0; i < count; i++)
1857                 ppga[i] = pga + i;
1858         return ppga;
1859 }
1860
1861 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1862 {
1863         LASSERT(ppga != NULL);
1864         OBD_FREE(ppga, sizeof(*ppga) * count);
1865 }
1866
1867 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1868                    obd_count page_count, struct brw_page *pga,
1869                    struct obd_trans_info *oti)
1870 {
1871         struct obdo *saved_oa = NULL;
1872         struct brw_page **ppga, **orig;
1873         struct obd_import *imp = class_exp2cliimp(exp);
1874         struct client_obd *cli;
1875         int rc, page_count_orig;
1876         ENTRY;
1877
1878         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1879         cli = &imp->imp_obd->u.cli;
1880
1881         if (cmd & OBD_BRW_CHECK) {
1882                 /* The caller just wants to know if there's a chance that this
1883                  * I/O can succeed */
1884
1885                 if (imp->imp_invalid)
1886                         RETURN(-EIO);
1887                 RETURN(0);
1888         }
1889
1890         /* test_brw with a failed create can trip this, maybe others. */
1891         LASSERT(cli->cl_max_pages_per_rpc);
1892
1893         rc = 0;
1894
1895         orig = ppga = osc_build_ppga(pga, page_count);
1896         if (ppga == NULL)
1897                 RETURN(-ENOMEM);
1898         page_count_orig = page_count;
1899
1900         sort_brw_pages(ppga, page_count);
1901         while (page_count) {
1902                 obd_count pages_per_brw;
1903
1904                 if (page_count > cli->cl_max_pages_per_rpc)
1905                         pages_per_brw = cli->cl_max_pages_per_rpc;
1906                 else
1907                         pages_per_brw = page_count;
1908
1909                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1910
1911                 if (saved_oa != NULL) {
1912                         /* restore previously saved oa */
1913                         *oinfo->oi_oa = *saved_oa;
1914                 } else if (page_count > pages_per_brw) {
1915                         /* save a copy of oa (brw will clobber it) */
1916                         OBDO_ALLOC(saved_oa);
1917                         if (saved_oa == NULL)
1918                                 GOTO(out, rc = -ENOMEM);
1919                         *saved_oa = *oinfo->oi_oa;
1920                 }
1921
1922                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1923                                       pages_per_brw, ppga, oinfo->oi_capa);
1924
1925                 if (rc != 0)
1926                         break;
1927
1928                 page_count -= pages_per_brw;
1929                 ppga += pages_per_brw;
1930         }
1931
1932 out:
1933         osc_release_ppga(orig, page_count_orig);
1934
1935         if (saved_oa != NULL)
1936                 OBDO_FREE(saved_oa);
1937
1938         RETURN(rc);
1939 }
1940
1941 static int brw_interpret(const struct lu_env *env,
1942                          struct ptlrpc_request *req, void *data, int rc)
1943 {
1944         struct osc_brw_async_args *aa = data;
1945         struct osc_extent *ext;
1946         struct osc_extent *tmp;
1947         struct cl_object  *obj = NULL;
1948         struct client_obd *cli = aa->aa_cli;
1949         ENTRY;
1950
1951         rc = osc_brw_fini_request(req, rc);
1952         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1953         /* When server return -EINPROGRESS, client should always retry
1954          * regardless of the number of times the bulk was resent already. */
1955         if (osc_recoverable_error(rc)) {
1956                 if (req->rq_import_generation !=
1957                     req->rq_import->imp_generation) {
1958                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1959                                ""LPU64":"LPU64", rc = %d.\n",
1960                                req->rq_import->imp_obd->obd_name,
1961                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1962                 } else if (rc == -EINPROGRESS ||
1963                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1964                         rc = osc_brw_redo_request(req, aa, rc);
1965                 } else {
1966                         CERROR("%s: too many resent retries for object: "
1967                                ""LPU64":"LPU64", rc = %d.\n",
1968                                req->rq_import->imp_obd->obd_name,
1969                                aa->aa_oa->o_id, aa->aa_oa->o_seq, rc);
1970                 }
1971
1972                 if (rc == 0)
1973                         RETURN(0);
1974                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1975                         rc = -EIO;
1976         }
1977
1978         if (aa->aa_ocapa) {
1979                 capa_put(aa->aa_ocapa);
1980                 aa->aa_ocapa = NULL;
1981         }
1982
1983         cfs_list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1984                 if (obj == NULL && rc == 0) {
1985                         obj = osc2cl(ext->oe_obj);
1986                         cl_object_get(obj);
1987                 }
1988
1989                 cfs_list_del_init(&ext->oe_link);
1990                 osc_extent_finish(env, ext, 1, rc);
1991         }
1992         LASSERT(cfs_list_empty(&aa->aa_exts));
1993         LASSERT(cfs_list_empty(&aa->aa_oaps));
1994
1995         if (obj != NULL) {
1996                 struct obdo *oa = aa->aa_oa;
1997                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1998                 unsigned long valid = 0;
1999
2000                 LASSERT(rc == 0);
2001                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2002                         attr->cat_blocks = oa->o_blocks;
2003                         valid |= CAT_BLOCKS;
2004                 }
2005                 if (oa->o_valid & OBD_MD_FLMTIME) {
2006                         attr->cat_mtime = oa->o_mtime;
2007                         valid |= CAT_MTIME;
2008                 }
2009                 if (oa->o_valid & OBD_MD_FLATIME) {
2010                         attr->cat_atime = oa->o_atime;
2011                         valid |= CAT_ATIME;
2012                 }
2013                 if (oa->o_valid & OBD_MD_FLCTIME) {
2014                         attr->cat_ctime = oa->o_ctime;
2015                         valid |= CAT_CTIME;
2016                 }
2017                 if (valid != 0) {
2018                         cl_object_attr_lock(obj);
2019                         cl_object_attr_set(env, obj, attr, valid);
2020                         cl_object_attr_unlock(obj);
2021                 }
2022                 cl_object_put(env, obj);
2023         }
2024         OBDO_FREE(aa->aa_oa);
2025
2026         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2027                           req->rq_bulk->bd_nob_transferred);
2028         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2029         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2030
2031         client_obd_list_lock(&cli->cl_loi_list_lock);
2032         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2033          * is called so we know whether to go to sync BRWs or wait for more
2034          * RPCs to complete */
2035         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2036                 cli->cl_w_in_flight--;
2037         else
2038                 cli->cl_r_in_flight--;
2039         osc_wake_cache_waiters(cli);
2040         client_obd_list_unlock(&cli->cl_loi_list_lock);
2041
2042         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2043         RETURN(rc);
2044 }
2045
2046 /**
2047  * Build an RPC by the list of extent @ext_list. The caller must ensure
2048  * that the total pages in this list are NOT over max pages per RPC.
2049  * Extents in the list must be in OES_RPC state.
2050  */
2051 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2052                   cfs_list_t *ext_list, int cmd, pdl_policy_t pol)
2053 {
2054         struct ptlrpc_request *req = NULL;
2055         struct osc_extent *ext;
2056         CFS_LIST_HEAD(rpc_list);
2057         struct brw_page **pga = NULL;
2058         struct osc_brw_async_args *aa = NULL;
2059         struct obdo *oa = NULL;
2060         struct osc_async_page *oap;
2061         struct osc_async_page *tmp;
2062         struct cl_req *clerq = NULL;
2063         enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE : CRT_READ;
2064         struct ldlm_lock *lock = NULL;
2065         struct cl_req_attr crattr;
2066         obd_off starting_offset = OBD_OBJECT_EOF;
2067         obd_off ending_offset = 0;
2068         int i, rc, mpflag = 0, mem_tight = 0, page_count = 0;
2069
2070         ENTRY;
2071         LASSERT(!cfs_list_empty(ext_list));
2072
2073         /* add pages into rpc_list to build BRW rpc */
2074         cfs_list_for_each_entry(ext, ext_list, oe_link) {
2075                 LASSERT(ext->oe_state == OES_RPC);
2076                 mem_tight |= ext->oe_memalloc;
2077                 cfs_list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2078                         ++page_count;
2079                         cfs_list_add_tail(&oap->oap_rpc_item, &rpc_list);
2080                         if (starting_offset > oap->oap_obj_off)
2081                                 starting_offset = oap->oap_obj_off;
2082                         else
2083                                 LASSERT(oap->oap_page_off == 0);
2084                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2085                                 ending_offset = oap->oap_obj_off +
2086                                                 oap->oap_count;
2087                         else
2088                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2089                                         CFS_PAGE_SIZE);
2090                 }
2091         }
2092
2093         if (mem_tight)
2094                 mpflag = cfs_memory_pressure_get_and_set();
2095
2096         memset(&crattr, 0, sizeof crattr);
2097         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2098         if (pga == NULL)
2099                 GOTO(out, rc = -ENOMEM);
2100
2101         OBDO_ALLOC(oa);
2102         if (oa == NULL)
2103                 GOTO(out, rc = -ENOMEM);
2104
2105         i = 0;
2106         cfs_list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2107                 struct cl_page *page = oap2cl_page(oap);
2108                 if (clerq == NULL) {
2109                         clerq = cl_req_alloc(env, page, crt,
2110                                              1 /* only 1-object rpcs for
2111                                                 * now */);
2112                         if (IS_ERR(clerq))
2113                                 GOTO(out, rc = PTR_ERR(clerq));
2114                         lock = oap->oap_ldlm_lock;
2115                 }
2116                 if (mem_tight)
2117                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2118                 pga[i] = &oap->oap_brw_page;
2119                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2120                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2121                        pga[i]->pg, cfs_page_index(oap->oap_page), oap, pga[i]->flag);
2122                 i++;
2123                 cl_req_page_add(env, clerq, page);
2124         }
2125
2126         /* always get the data for the obdo for the rpc */
2127         LASSERT(clerq != NULL);
2128         crattr.cra_oa = oa;
2129         crattr.cra_capa = NULL;
2130         memset(crattr.cra_jobid, 0, JOBSTATS_JOBID_SIZE);
2131         cl_req_attr_set(env, clerq, &crattr, ~0ULL);
2132         if (lock) {
2133                 oa->o_handle = lock->l_remote_handle;
2134                 oa->o_valid |= OBD_MD_FLHANDLE;
2135         }
2136
2137         rc = cl_req_prep(env, clerq);
2138         if (rc != 0) {
2139                 CERROR("cl_req_prep failed: %d\n", rc);
2140                 GOTO(out, rc);
2141         }
2142
2143         sort_brw_pages(pga, page_count);
2144         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2145                         pga, &req, crattr.cra_capa, 1, 0);
2146         if (rc != 0) {
2147                 CERROR("prep_req failed: %d\n", rc);
2148                 GOTO(out, rc);
2149         }
2150
2151         req->rq_interpret_reply = brw_interpret;
2152         if (mem_tight != 0)
2153                 req->rq_memalloc = 1;
2154
2155         /* Need to update the timestamps after the request is built in case
2156          * we race with setattr (locally or in queue at OST).  If OST gets
2157          * later setattr before earlier BRW (as determined by the request xid),
2158          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2159          * way to do this in a single call.  bug 10150 */
2160         cl_req_attr_set(env, clerq, &crattr,
2161                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2162
2163         lustre_msg_set_jobid(req->rq_reqmsg, crattr.cra_jobid);
2164
2165         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2166         aa = ptlrpc_req_async_args(req);
2167         CFS_INIT_LIST_HEAD(&aa->aa_oaps);
2168         cfs_list_splice_init(&rpc_list, &aa->aa_oaps);
2169         CFS_INIT_LIST_HEAD(&aa->aa_exts);
2170         cfs_list_splice_init(ext_list, &aa->aa_exts);
2171         aa->aa_clerq = clerq;
2172
2173         /* queued sync pages can be torn down while the pages
2174          * were between the pending list and the rpc */
2175         tmp = NULL;
2176         cfs_list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2177                 /* only one oap gets a request reference */
2178                 if (tmp == NULL)
2179                         tmp = oap;
2180                 if (oap->oap_interrupted && !req->rq_intr) {
2181                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2182                                         oap, req);
2183                         ptlrpc_mark_interrupted(req);
2184                 }
2185         }
2186         if (tmp != NULL)
2187                 tmp->oap_request = ptlrpc_request_addref(req);
2188
2189         client_obd_list_lock(&cli->cl_loi_list_lock);
2190         starting_offset >>= CFS_PAGE_SHIFT;
2191         if (cmd == OBD_BRW_READ) {
2192                 cli->cl_r_in_flight++;
2193                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2194                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2195                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2196                                       starting_offset + 1);
2197         } else {
2198                 cli->cl_w_in_flight++;
2199                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2200                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2201                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2202                                       starting_offset + 1);
2203         }
2204         client_obd_list_unlock(&cli->cl_loi_list_lock);
2205
2206         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2207                   page_count, aa, cli->cl_r_in_flight,
2208                   cli->cl_w_in_flight);
2209
2210         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2211          * see which CPU/NUMA node the majority of pages were allocated
2212          * on, and try to assign the async RPC to the CPU core
2213          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2214          *
2215          * But on the other hand, we expect that multiple ptlrpcd
2216          * threads and the initial write sponsor can run in parallel,
2217          * especially when data checksum is enabled, which is CPU-bound
2218          * operation and single ptlrpcd thread cannot process in time.
2219          * So more ptlrpcd threads sharing BRW load
2220          * (with PDL_POLICY_ROUND) seems better.
2221          */
2222         ptlrpcd_add_req(req, pol, -1);
2223         rc = 0;
2224         EXIT;
2225
2226 out:
2227         if (mem_tight != 0)
2228                 cfs_memory_pressure_restore(mpflag);
2229
2230         capa_put(crattr.cra_capa);
2231         if (rc != 0) {
2232                 LASSERT(req == NULL);
2233
2234                 if (oa)
2235                         OBDO_FREE(oa);
2236                 if (pga)
2237                         OBD_FREE(pga, sizeof(*pga) * page_count);
2238                 /* this should happen rarely and is pretty bad, it makes the
2239                  * pending list not follow the dirty order */
2240                 while (!cfs_list_empty(ext_list)) {
2241                         ext = cfs_list_entry(ext_list->next, struct osc_extent,
2242                                              oe_link);
2243                         cfs_list_del_init(&ext->oe_link);
2244                         osc_extent_finish(env, ext, 0, rc);
2245                 }
2246                 if (clerq && !IS_ERR(clerq))
2247                         cl_req_completion(env, clerq, rc);
2248         }
2249         RETURN(rc);
2250 }
2251
2252 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2253                                         struct ldlm_enqueue_info *einfo)
2254 {
2255         void *data = einfo->ei_cbdata;
2256         int set = 0;
2257
2258         LASSERT(lock != NULL);
2259         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2260         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2261         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2262         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2263
2264         lock_res_and_lock(lock);
2265         spin_lock(&osc_ast_guard);
2266
2267         if (lock->l_ast_data == NULL)
2268                 lock->l_ast_data = data;
2269         if (lock->l_ast_data == data)
2270                 set = 1;
2271
2272         spin_unlock(&osc_ast_guard);
2273         unlock_res_and_lock(lock);
2274
2275         return set;
2276 }
2277
2278 static int osc_set_data_with_check(struct lustre_handle *lockh,
2279                                    struct ldlm_enqueue_info *einfo)
2280 {
2281         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2282         int set = 0;
2283
2284         if (lock != NULL) {
2285                 set = osc_set_lock_data_with_check(lock, einfo);
2286                 LDLM_LOCK_PUT(lock);
2287         } else
2288                 CERROR("lockh %p, data %p - client evicted?\n",
2289                        lockh, einfo->ei_cbdata);
2290         return set;
2291 }
2292
2293 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2294                              ldlm_iterator_t replace, void *data)
2295 {
2296         struct ldlm_res_id res_id;
2297         struct obd_device *obd = class_exp2obd(exp);
2298
2299         ostid_build_res_name(&lsm->lsm_object_oid, &res_id);
2300         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2301         return 0;
2302 }
2303
2304 /* find any ldlm lock of the inode in osc
2305  * return 0    not find
2306  *        1    find one
2307  *      < 0    error */
2308 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2309                            ldlm_iterator_t replace, void *data)
2310 {
2311         struct ldlm_res_id res_id;
2312         struct obd_device *obd = class_exp2obd(exp);
2313         int rc = 0;
2314
2315         ostid_build_res_name(&lsm->lsm_object_oid, &res_id);
2316         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2317         if (rc == LDLM_ITER_STOP)
2318                 return(1);
2319         if (rc == LDLM_ITER_CONTINUE)
2320                 return(0);
2321         return(rc);
2322 }
2323
2324 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2325                             obd_enqueue_update_f upcall, void *cookie,
2326                             __u64 *flags, int agl, int rc)
2327 {
2328         int intent = *flags & LDLM_FL_HAS_INTENT;
2329         ENTRY;
2330
2331         if (intent) {
2332                 /* The request was created before ldlm_cli_enqueue call. */
2333                 if (rc == ELDLM_LOCK_ABORTED) {
2334                         struct ldlm_reply *rep;
2335                         rep = req_capsule_server_get(&req->rq_pill,
2336                                                      &RMF_DLM_REP);
2337
2338                         LASSERT(rep != NULL);
2339                         if (rep->lock_policy_res1)
2340                                 rc = rep->lock_policy_res1;
2341                 }
2342         }
2343
2344         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2345             (rc == 0)) {
2346                 *flags |= LDLM_FL_LVB_READY;
2347                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2348                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2349         }
2350
2351         /* Call the update callback. */
2352         rc = (*upcall)(cookie, rc);
2353         RETURN(rc);
2354 }
2355
2356 static int osc_enqueue_interpret(const struct lu_env *env,
2357                                  struct ptlrpc_request *req,
2358                                  struct osc_enqueue_args *aa, int rc)
2359 {
2360         struct ldlm_lock *lock;
2361         struct lustre_handle handle;
2362         __u32 mode;
2363         struct ost_lvb *lvb;
2364         __u32 lvb_len;
2365         __u64 *flags = aa->oa_flags;
2366
2367         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2368          * might be freed anytime after lock upcall has been called. */
2369         lustre_handle_copy(&handle, aa->oa_lockh);
2370         mode = aa->oa_ei->ei_mode;
2371
2372         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2373          * be valid. */
2374         lock = ldlm_handle2lock(&handle);
2375
2376         /* Take an additional reference so that a blocking AST that
2377          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2378          * to arrive after an upcall has been executed by
2379          * osc_enqueue_fini(). */
2380         ldlm_lock_addref(&handle, mode);
2381
2382         /* Let CP AST to grant the lock first. */
2383         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2384
2385         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2386                 lvb = NULL;
2387                 lvb_len = 0;
2388         } else {
2389                 lvb = aa->oa_lvb;
2390                 lvb_len = sizeof(*aa->oa_lvb);
2391         }
2392
2393         /* Complete obtaining the lock procedure. */
2394         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2395                                    mode, flags, lvb, lvb_len, &handle, rc);
2396         /* Complete osc stuff. */
2397         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2398                               flags, aa->oa_agl, rc);
2399
2400         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2401
2402         /* Release the lock for async request. */
2403         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2404                 /*
2405                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2406                  * not already released by
2407                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2408                  */
2409                 ldlm_lock_decref(&handle, mode);
2410
2411         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2412                  aa->oa_lockh, req, aa);
2413         ldlm_lock_decref(&handle, mode);
2414         LDLM_LOCK_PUT(lock);
2415         return rc;
2416 }
2417
2418 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2419                         struct lov_oinfo *loi, int flags,
2420                         struct ost_lvb *lvb, __u32 mode, int rc)
2421 {
2422         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2423
2424         if (rc == ELDLM_OK) {
2425                 __u64 tmp;
2426
2427                 LASSERT(lock != NULL);
2428                 loi->loi_lvb = *lvb;
2429                 tmp = loi->loi_lvb.lvb_size;
2430                 /* Extend KMS up to the end of this lock and no further
2431                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2432                 if (tmp > lock->l_policy_data.l_extent.end)
2433                         tmp = lock->l_policy_data.l_extent.end + 1;
2434                 if (tmp >= loi->loi_kms) {
2435                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2436                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2437                         loi_kms_set(loi, tmp);
2438                 } else {
2439                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2440                                    LPU64"; leaving kms="LPU64", end="LPU64,
2441                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2442                                    lock->l_policy_data.l_extent.end);
2443                 }
2444                 ldlm_lock_allow_match(lock);
2445         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2446                 LASSERT(lock != NULL);
2447                 loi->loi_lvb = *lvb;
2448                 ldlm_lock_allow_match(lock);
2449                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2450                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2451                 rc = ELDLM_OK;
2452         }
2453
2454         if (lock != NULL) {
2455                 if (rc != ELDLM_OK)
2456                         ldlm_lock_fail_match(lock);
2457
2458                 LDLM_LOCK_PUT(lock);
2459         }
2460 }
2461 EXPORT_SYMBOL(osc_update_enqueue);
2462
2463 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2464
2465 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2466  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2467  * other synchronous requests, however keeping some locks and trying to obtain
2468  * others may take a considerable amount of time in a case of ost failure; and
2469  * when other sync requests do not get released lock from a client, the client
2470  * is excluded from the cluster -- such scenarious make the life difficult, so
2471  * release locks just after they are obtained. */
2472 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2473                      __u64 *flags, ldlm_policy_data_t *policy,
2474                      struct ost_lvb *lvb, int kms_valid,
2475                      obd_enqueue_update_f upcall, void *cookie,
2476                      struct ldlm_enqueue_info *einfo,
2477                      struct lustre_handle *lockh,
2478                      struct ptlrpc_request_set *rqset, int async, int agl)
2479 {
2480         struct obd_device *obd = exp->exp_obd;
2481         struct ptlrpc_request *req = NULL;
2482         int intent = *flags & LDLM_FL_HAS_INTENT;
2483         int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2484         ldlm_mode_t mode;
2485         int rc;
2486         ENTRY;
2487
2488         /* Filesystem lock extents are extended to page boundaries so that
2489          * dealing with the page cache is a little smoother.  */
2490         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2491         policy->l_extent.end |= ~CFS_PAGE_MASK;
2492
2493         /*
2494          * kms is not valid when either object is completely fresh (so that no
2495          * locks are cached), or object was evicted. In the latter case cached
2496          * lock cannot be used, because it would prime inode state with
2497          * potentially stale LVB.
2498          */
2499         if (!kms_valid)
2500                 goto no_match;
2501
2502         /* Next, search for already existing extent locks that will cover us */
2503         /* If we're trying to read, we also search for an existing PW lock.  The
2504          * VFS and page cache already protect us locally, so lots of readers/
2505          * writers can share a single PW lock.
2506          *
2507          * There are problems with conversion deadlocks, so instead of
2508          * converting a read lock to a write lock, we'll just enqueue a new
2509          * one.
2510          *
2511          * At some point we should cancel the read lock instead of making them
2512          * send us a blocking callback, but there are problems with canceling
2513          * locks out from other users right now, too. */
2514         mode = einfo->ei_mode;
2515         if (einfo->ei_mode == LCK_PR)
2516                 mode |= LCK_PW;
2517         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2518                                einfo->ei_type, policy, mode, lockh, 0);
2519         if (mode) {
2520                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2521
2522                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2523                         /* For AGL, if enqueue RPC is sent but the lock is not
2524                          * granted, then skip to process this strpe.
2525                          * Return -ECANCELED to tell the caller. */
2526                         ldlm_lock_decref(lockh, mode);
2527                         LDLM_LOCK_PUT(matched);
2528                         RETURN(-ECANCELED);
2529                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2530                         *flags |= LDLM_FL_LVB_READY;
2531                         /* addref the lock only if not async requests and PW
2532                          * lock is matched whereas we asked for PR. */
2533                         if (!rqset && einfo->ei_mode != mode)
2534                                 ldlm_lock_addref(lockh, LCK_PR);
2535                         if (intent) {
2536                                 /* I would like to be able to ASSERT here that
2537                                  * rss <= kms, but I can't, for reasons which
2538                                  * are explained in lov_enqueue() */
2539                         }
2540
2541                         /* We already have a lock, and it's referenced.
2542                          *
2543                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2544                          * AGL upcall may change it to CLS_HELD directly. */
2545                         (*upcall)(cookie, ELDLM_OK);
2546
2547                         if (einfo->ei_mode != mode)
2548                                 ldlm_lock_decref(lockh, LCK_PW);
2549                         else if (rqset)
2550                                 /* For async requests, decref the lock. */
2551                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2552                         LDLM_LOCK_PUT(matched);
2553                         RETURN(ELDLM_OK);
2554                 } else {
2555                         ldlm_lock_decref(lockh, mode);
2556                         LDLM_LOCK_PUT(matched);
2557                 }
2558         }
2559
2560  no_match:
2561         if (intent) {
2562                 CFS_LIST_HEAD(cancels);
2563                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2564                                            &RQF_LDLM_ENQUEUE_LVB);
2565                 if (req == NULL)
2566                         RETURN(-ENOMEM);
2567
2568                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2569                 if (rc) {
2570                         ptlrpc_request_free(req);
2571                         RETURN(rc);
2572                 }
2573
2574                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2575                                      sizeof *lvb);
2576                 ptlrpc_request_set_replen(req);
2577         }
2578
2579         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2580         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2581
2582         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2583                               sizeof(*lvb), LVB_T_OST, lockh, async);
2584         if (rqset) {
2585                 if (!rc) {
2586                         struct osc_enqueue_args *aa;
2587                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2588                         aa = ptlrpc_req_async_args(req);
2589                         aa->oa_ei = einfo;
2590                         aa->oa_exp = exp;
2591                         aa->oa_flags  = flags;
2592                         aa->oa_upcall = upcall;
2593                         aa->oa_cookie = cookie;
2594                         aa->oa_lvb    = lvb;
2595                         aa->oa_lockh  = lockh;
2596                         aa->oa_agl    = !!agl;
2597
2598                         req->rq_interpret_reply =
2599                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2600                         if (rqset == PTLRPCD_SET)
2601                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2602                         else
2603                                 ptlrpc_set_add_req(rqset, req);
2604                 } else if (intent) {
2605                         ptlrpc_req_finished(req);
2606                 }
2607                 RETURN(rc);
2608         }
2609
2610         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2611         if (intent)
2612                 ptlrpc_req_finished(req);
2613
2614         RETURN(rc);
2615 }
2616
2617 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2618                        struct ldlm_enqueue_info *einfo,
2619                        struct ptlrpc_request_set *rqset)
2620 {
2621         struct ldlm_res_id res_id;
2622         int rc;
2623         ENTRY;
2624
2625         ostid_build_res_name(&oinfo->oi_md->lsm_object_oid, &res_id);
2626         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2627                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2628                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2629                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2630                               rqset, rqset != NULL, 0);
2631         RETURN(rc);
2632 }
2633
2634 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2635                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2636                    int *flags, void *data, struct lustre_handle *lockh,
2637                    int unref)
2638 {
2639         struct obd_device *obd = exp->exp_obd;
2640         int lflags = *flags;
2641         ldlm_mode_t rc;
2642         ENTRY;
2643
2644         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2645                 RETURN(-EIO);
2646
2647         /* Filesystem lock extents are extended to page boundaries so that
2648          * dealing with the page cache is a little smoother */
2649         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2650         policy->l_extent.end |= ~CFS_PAGE_MASK;
2651
2652         /* Next, search for already existing extent locks that will cover us */
2653         /* If we're trying to read, we also search for an existing PW lock.  The
2654          * VFS and page cache already protect us locally, so lots of readers/
2655          * writers can share a single PW lock. */
2656         rc = mode;
2657         if (mode == LCK_PR)
2658                 rc |= LCK_PW;
2659         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2660                              res_id, type, policy, rc, lockh, unref);
2661         if (rc) {
2662                 if (data != NULL) {
2663                         if (!osc_set_data_with_check(lockh, data)) {
2664                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2665                                         ldlm_lock_decref(lockh, rc);
2666                                 RETURN(0);
2667                         }
2668                 }
2669                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2670                         ldlm_lock_addref(lockh, LCK_PR);
2671                         ldlm_lock_decref(lockh, LCK_PW);
2672                 }
2673                 RETURN(rc);
2674         }
2675         RETURN(rc);
2676 }
2677
2678 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2679 {
2680         ENTRY;
2681
2682         if (unlikely(mode == LCK_GROUP))
2683                 ldlm_lock_decref_and_cancel(lockh, mode);
2684         else
2685                 ldlm_lock_decref(lockh, mode);
2686
2687         RETURN(0);
2688 }
2689
2690 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2691                       __u32 mode, struct lustre_handle *lockh)
2692 {
2693         ENTRY;
2694         RETURN(osc_cancel_base(lockh, mode));
2695 }
2696
2697 static int osc_cancel_unused(struct obd_export *exp,
2698                              struct lov_stripe_md *lsm,
2699                              ldlm_cancel_flags_t flags,
2700                              void *opaque)
2701 {
2702         struct obd_device *obd = class_exp2obd(exp);
2703         struct ldlm_res_id res_id, *resp = NULL;
2704
2705         if (lsm != NULL) {
2706                 ostid_build_res_name(&lsm->lsm_object_oid, &res_id);
2707                 resp = &res_id;
2708         }
2709
2710         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2711 }
2712
2713 static int osc_statfs_interpret(const struct lu_env *env,
2714                                 struct ptlrpc_request *req,
2715                                 struct osc_async_args *aa, int rc)
2716 {
2717         struct obd_statfs *msfs;
2718         ENTRY;
2719
2720         if (rc == -EBADR)
2721                 /* The request has in fact never been sent
2722                  * due to issues at a higher level (LOV).
2723                  * Exit immediately since the caller is
2724                  * aware of the problem and takes care
2725                  * of the clean up */
2726                  RETURN(rc);
2727
2728         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2729             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2730                 GOTO(out, rc = 0);
2731
2732         if (rc != 0)
2733                 GOTO(out, rc);
2734
2735         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2736         if (msfs == NULL) {
2737                 GOTO(out, rc = -EPROTO);
2738         }
2739
2740         *aa->aa_oi->oi_osfs = *msfs;
2741 out:
2742         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2743         RETURN(rc);
2744 }
2745
2746 static int osc_statfs_async(struct obd_export *exp,
2747                             struct obd_info *oinfo, __u64 max_age,
2748                             struct ptlrpc_request_set *rqset)
2749 {
2750         struct obd_device     *obd = class_exp2obd(exp);
2751         struct ptlrpc_request *req;
2752         struct osc_async_args *aa;
2753         int                    rc;
2754         ENTRY;
2755
2756         /* We could possibly pass max_age in the request (as an absolute
2757          * timestamp or a "seconds.usec ago") so the target can avoid doing
2758          * extra calls into the filesystem if that isn't necessary (e.g.
2759          * during mount that would help a bit).  Having relative timestamps
2760          * is not so great if request processing is slow, while absolute
2761          * timestamps are not ideal because they need time synchronization. */
2762         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2763         if (req == NULL)
2764                 RETURN(-ENOMEM);
2765
2766         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2767         if (rc) {
2768                 ptlrpc_request_free(req);
2769                 RETURN(rc);
2770         }
2771         ptlrpc_request_set_replen(req);
2772         req->rq_request_portal = OST_CREATE_PORTAL;
2773         ptlrpc_at_set_req_timeout(req);
2774
2775         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2776                 /* procfs requests not want stat in wait for avoid deadlock */
2777                 req->rq_no_resend = 1;
2778                 req->rq_no_delay = 1;
2779         }
2780
2781         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2782         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2783         aa = ptlrpc_req_async_args(req);
2784         aa->aa_oi = oinfo;
2785
2786         ptlrpc_set_add_req(rqset, req);
2787         RETURN(0);
2788 }
2789
2790 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2791                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2792 {
2793         struct obd_device     *obd = class_exp2obd(exp);
2794         struct obd_statfs     *msfs;
2795         struct ptlrpc_request *req;
2796         struct obd_import     *imp = NULL;
2797         int rc;
2798         ENTRY;
2799
2800         /*Since the request might also come from lprocfs, so we need
2801          *sync this with client_disconnect_export Bug15684*/
2802         down_read(&obd->u.cli.cl_sem);
2803         if (obd->u.cli.cl_import)
2804                 imp = class_import_get(obd->u.cli.cl_import);
2805         up_read(&obd->u.cli.cl_sem);
2806         if (!imp)
2807                 RETURN(-ENODEV);
2808
2809         /* We could possibly pass max_age in the request (as an absolute
2810          * timestamp or a "seconds.usec ago") so the target can avoid doing
2811          * extra calls into the filesystem if that isn't necessary (e.g.
2812          * during mount that would help a bit).  Having relative timestamps
2813          * is not so great if request processing is slow, while absolute
2814          * timestamps are not ideal because they need time synchronization. */
2815         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2816
2817         class_import_put(imp);
2818
2819         if (req == NULL)
2820                 RETURN(-ENOMEM);
2821
2822         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2823         if (rc) {
2824                 ptlrpc_request_free(req);
2825                 RETURN(rc);
2826         }
2827         ptlrpc_request_set_replen(req);
2828         req->rq_request_portal = OST_CREATE_PORTAL;
2829         ptlrpc_at_set_req_timeout(req);
2830
2831         if (flags & OBD_STATFS_NODELAY) {
2832                 /* procfs requests not want stat in wait for avoid deadlock */
2833                 req->rq_no_resend = 1;
2834                 req->rq_no_delay = 1;
2835         }
2836
2837         rc = ptlrpc_queue_wait(req);
2838         if (rc)
2839                 GOTO(out, rc);
2840
2841         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2842         if (msfs == NULL) {
2843                 GOTO(out, rc = -EPROTO);
2844         }
2845
2846         *osfs = *msfs;
2847
2848         EXIT;
2849  out:
2850         ptlrpc_req_finished(req);
2851         return rc;
2852 }
2853
2854 /* Retrieve object striping information.
2855  *
2856  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2857  * the maximum number of OST indices which will fit in the user buffer.
2858  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2859  */
2860 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2861 {
2862         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2863         struct lov_user_md_v3 lum, *lumk;
2864         struct lov_user_ost_data_v1 *lmm_objects;
2865         int rc = 0, lum_size;
2866         ENTRY;
2867
2868         if (!lsm)
2869                 RETURN(-ENODATA);
2870
2871         /* we only need the header part from user space to get lmm_magic and
2872          * lmm_stripe_count, (the header part is common to v1 and v3) */
2873         lum_size = sizeof(struct lov_user_md_v1);
2874         if (cfs_copy_from_user(&lum, lump, lum_size))
2875                 RETURN(-EFAULT);
2876
2877         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2878             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2879                 RETURN(-EINVAL);
2880
2881         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2882         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2883         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2884         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2885
2886         /* we can use lov_mds_md_size() to compute lum_size
2887          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2888         if (lum.lmm_stripe_count > 0) {
2889                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2890                 OBD_ALLOC(lumk, lum_size);
2891                 if (!lumk)
2892                         RETURN(-ENOMEM);
2893
2894                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2895                         lmm_objects = &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2896                 else
2897                         lmm_objects = &(lumk->lmm_objects[0]);
2898                 lmm_objects->l_object_id = lsm->lsm_object_id;
2899         } else {
2900                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2901                 lumk = &lum;
2902         }
2903
2904         lumk->lmm_object_id = lsm->lsm_object_id;
2905         lumk->lmm_object_seq = lsm->lsm_object_seq;
2906         lumk->lmm_stripe_count = 1;
2907
2908         if (cfs_copy_to_user(lump, lumk, lum_size))
2909                 rc = -EFAULT;
2910
2911         if (lumk != &lum)
2912                 OBD_FREE(lumk, lum_size);
2913
2914         RETURN(rc);
2915 }
2916
2917
2918 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2919                          void *karg, void *uarg)
2920 {
2921         struct obd_device *obd = exp->exp_obd;
2922         struct obd_ioctl_data *data = karg;
2923         int err = 0;
2924         ENTRY;
2925
2926         if (!cfs_try_module_get(THIS_MODULE)) {
2927                 CERROR("Can't get module. Is it alive?");
2928                 return -EINVAL;
2929         }
2930         switch (cmd) {
2931         case OBD_IOC_LOV_GET_CONFIG: {
2932                 char *buf;
2933                 struct lov_desc *desc;
2934                 struct obd_uuid uuid;
2935
2936                 buf = NULL;
2937                 len = 0;
2938                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2939                         GOTO(out, err = -EINVAL);
2940
2941                 data = (struct obd_ioctl_data *)buf;
2942
2943                 if (sizeof(*desc) > data->ioc_inllen1) {
2944                         obd_ioctl_freedata(buf, len);
2945                         GOTO(out, err = -EINVAL);
2946                 }
2947
2948                 if (data->ioc_inllen2 < sizeof(uuid)) {
2949                         obd_ioctl_freedata(buf, len);
2950                         GOTO(out, err = -EINVAL);
2951                 }
2952
2953                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2954                 desc->ld_tgt_count = 1;
2955                 desc->ld_active_tgt_count = 1;
2956                 desc->ld_default_stripe_count = 1;
2957                 desc->ld_default_stripe_size = 0;
2958                 desc->ld_default_stripe_offset = 0;
2959                 desc->ld_pattern = 0;
2960                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2961
2962                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2963
2964                 err = cfs_copy_to_user((void *)uarg, buf, len);
2965                 if (err)
2966                         err = -EFAULT;
2967                 obd_ioctl_freedata(buf, len);
2968                 GOTO(out, err);
2969         }
2970         case LL_IOC_LOV_SETSTRIPE:
2971                 err = obd_alloc_memmd(exp, karg);
2972                 if (err > 0)
2973                         err = 0;
2974                 GOTO(out, err);
2975         case LL_IOC_LOV_GETSTRIPE:
2976                 err = osc_getstripe(karg, uarg);
2977                 GOTO(out, err);
2978         case OBD_IOC_CLIENT_RECOVER:
2979                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2980                                             data->ioc_inlbuf1, 0);
2981                 if (err > 0)
2982                         err = 0;
2983                 GOTO(out, err);
2984         case IOC_OSC_SET_ACTIVE:
2985                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2986                                                data->ioc_offset);
2987                 GOTO(out, err);
2988         case OBD_IOC_POLL_QUOTACHECK:
2989                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2990                 GOTO(out, err);
2991         case OBD_IOC_PING_TARGET:
2992                 err = ptlrpc_obd_ping(obd);
2993                 GOTO(out, err);
2994         default:
2995                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2996                        cmd, cfs_curproc_comm());
2997                 GOTO(out, err = -ENOTTY);
2998         }
2999 out:
3000         cfs_module_put(THIS_MODULE);
3001         return err;
3002 }
3003
3004 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3005                         obd_count keylen, void *key, __u32 *vallen, void *val,
3006                         struct lov_stripe_md *lsm)
3007 {
3008         ENTRY;
3009         if (!vallen || !val)
3010                 RETURN(-EFAULT);
3011
3012         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3013                 __u32 *stripe = val;
3014                 *vallen = sizeof(*stripe);
3015                 *stripe = 0;
3016                 RETURN(0);
3017         } else if (KEY_IS(KEY_LAST_ID)) {
3018                 struct ptlrpc_request *req;
3019                 obd_id                *reply;
3020                 char                  *tmp;
3021                 int                    rc;
3022
3023                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3024                                            &RQF_OST_GET_INFO_LAST_ID);
3025                 if (req == NULL)
3026                         RETURN(-ENOMEM);
3027
3028                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3029                                      RCL_CLIENT, keylen);
3030                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3031                 if (rc) {
3032                         ptlrpc_request_free(req);
3033                         RETURN(rc);
3034                 }
3035
3036                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3037                 memcpy(tmp, key, keylen);
3038
3039                 req->rq_no_delay = req->rq_no_resend = 1;
3040                 ptlrpc_request_set_replen(req);
3041                 rc = ptlrpc_queue_wait(req);
3042                 if (rc)
3043                         GOTO(out, rc);
3044
3045                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3046                 if (reply == NULL)
3047                         GOTO(out, rc = -EPROTO);
3048
3049                 *((obd_id *)val) = *reply;
3050         out:
3051                 ptlrpc_req_finished(req);
3052                 RETURN(rc);
3053         } else if (KEY_IS(KEY_FIEMAP)) {
3054                 struct ptlrpc_request *req;
3055                 struct ll_user_fiemap *reply;
3056                 char *tmp;
3057                 int rc;
3058
3059                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3060                                            &RQF_OST_GET_INFO_FIEMAP);
3061                 if (req == NULL)
3062                         RETURN(-ENOMEM);
3063
3064                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3065                                      RCL_CLIENT, keylen);
3066                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3067                                      RCL_CLIENT, *vallen);
3068                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3069                                      RCL_SERVER, *vallen);
3070
3071                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3072                 if (rc) {
3073                         ptlrpc_request_free(req);
3074                         RETURN(rc);
3075                 }
3076
3077                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3078                 memcpy(tmp, key, keylen);
3079                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3080                 memcpy(tmp, val, *vallen);
3081
3082                 ptlrpc_request_set_replen(req);
3083                 rc = ptlrpc_queue_wait(req);
3084                 if (rc)
3085                         GOTO(out1, rc);
3086
3087                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3088                 if (reply == NULL)
3089                         GOTO(out1, rc = -EPROTO);
3090
3091                 memcpy(val, reply, *vallen);
3092         out1:
3093                 ptlrpc_req_finished(req);
3094
3095                 RETURN(rc);
3096         }
3097
3098         RETURN(-EINVAL);
3099 }
3100
3101 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3102                               obd_count keylen, void *key, obd_count vallen,
3103                               void *val, struct ptlrpc_request_set *set)
3104 {
3105         struct ptlrpc_request *req;
3106         struct obd_device     *obd = exp->exp_obd;
3107         struct obd_import     *imp = class_exp2cliimp(exp);
3108         char                  *tmp;
3109         int                    rc;
3110         ENTRY;
3111
3112         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3113
3114         if (KEY_IS(KEY_CHECKSUM)) {
3115                 if (vallen != sizeof(int))
3116                         RETURN(-EINVAL);
3117                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3118                 RETURN(0);
3119         }
3120
3121         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3122                 sptlrpc_conf_client_adapt(obd);
3123                 RETURN(0);
3124         }
3125
3126         if (KEY_IS(KEY_FLUSH_CTX)) {
3127                 sptlrpc_import_flush_my_ctx(imp);
3128                 RETURN(0);
3129         }
3130
3131         if (KEY_IS(KEY_CACHE_SET)) {
3132                 struct client_obd *cli = &obd->u.cli;
3133
3134                 LASSERT(cli->cl_cache == NULL); /* only once */
3135                 cli->cl_cache = (struct cl_client_cache *)val;
3136                 cfs_atomic_inc(&cli->cl_cache->ccc_users);
3137                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3138
3139                 /* add this osc into entity list */
3140                 LASSERT(cfs_list_empty(&cli->cl_lru_osc));
3141                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3142                 cfs_list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3143                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3144
3145                 RETURN(0);
3146         }
3147
3148         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3149                 struct client_obd *cli = &obd->u.cli;
3150                 int nr = cfs_atomic_read(&cli->cl_lru_in_list) >> 1;
3151                 int target = *(int *)val;
3152
3153                 nr = osc_lru_shrink(cli, min(nr, target));
3154                 *(int *)val -= nr;
3155                 RETURN(0);
3156         }
3157
3158         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3159                 RETURN(-EINVAL);
3160
3161         /* We pass all other commands directly to OST. Since nobody calls osc
3162            methods directly and everybody is supposed to go through LOV, we
3163            assume lov checked invalid values for us.
3164            The only recognised values so far are evict_by_nid and mds_conn.
3165            Even if something bad goes through, we'd get a -EINVAL from OST
3166            anyway. */
3167
3168         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3169                                                 &RQF_OST_SET_GRANT_INFO :
3170                                                 &RQF_OBD_SET_INFO);
3171         if (req == NULL)
3172                 RETURN(-ENOMEM);
3173
3174         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3175                              RCL_CLIENT, keylen);
3176         if (!KEY_IS(KEY_GRANT_SHRINK))
3177                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3178                                      RCL_CLIENT, vallen);
3179         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3180         if (rc) {
3181                 ptlrpc_request_free(req);
3182                 RETURN(rc);
3183         }
3184
3185         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3186         memcpy(tmp, key, keylen);
3187         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3188                                                         &RMF_OST_BODY :
3189                                                         &RMF_SETINFO_VAL);
3190         memcpy(tmp, val, vallen);
3191
3192         if (KEY_IS(KEY_GRANT_SHRINK)) {
3193                 struct osc_grant_args *aa;
3194                 struct obdo *oa;
3195
3196                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3197                 aa = ptlrpc_req_async_args(req);
3198                 OBDO_ALLOC(oa);
3199                 if (!oa) {
3200                         ptlrpc_req_finished(req);
3201                         RETURN(-ENOMEM);
3202                 }
3203                 *oa = ((struct ost_body *)val)->oa;
3204                 aa->aa_oa = oa;
3205                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3206         }
3207
3208         ptlrpc_request_set_replen(req);
3209         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3210                 LASSERT(set != NULL);
3211                 ptlrpc_set_add_req(set, req);
3212                 ptlrpc_check_set(NULL, set);
3213         } else
3214                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3215
3216         RETURN(0);
3217 }
3218
3219
3220 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3221                          struct obd_device *disk_obd, int *index)
3222 {
3223         /* this code is not supposed to be used with LOD/OSP
3224          * to be removed soon */
3225         LBUG();
3226         return 0;
3227 }
3228
3229 static int osc_llog_finish(struct obd_device *obd, int count)
3230 {
3231         struct llog_ctxt *ctxt;
3232
3233         ENTRY;
3234
3235         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3236         if (ctxt) {
3237                 llog_cat_close(NULL, ctxt->loc_handle);
3238                 llog_cleanup(NULL, ctxt);
3239         }
3240
3241         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3242         if (ctxt)
3243                 llog_cleanup(NULL, ctxt);
3244         RETURN(0);
3245 }
3246
3247 static int osc_reconnect(const struct lu_env *env,
3248                          struct obd_export *exp, struct obd_device *obd,
3249                          struct obd_uuid *cluuid,
3250                          struct obd_connect_data *data,
3251                          void *localdata)
3252 {
3253         struct client_obd *cli = &obd->u.cli;
3254
3255         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3256                 long lost_grant;
3257
3258                 client_obd_list_lock(&cli->cl_loi_list_lock);
3259                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3260                                 2 * cli->cl_max_pages_per_rpc << CFS_PAGE_SHIFT;
3261                 lost_grant = cli->cl_lost_grant;
3262                 cli->cl_lost_grant = 0;
3263                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3264
3265                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3266                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3267                        data->ocd_version, data->ocd_grant, lost_grant);
3268         }
3269
3270         RETURN(0);
3271 }
3272
3273 static int osc_disconnect(struct obd_export *exp)
3274 {
3275         struct obd_device *obd = class_exp2obd(exp);
3276         struct llog_ctxt  *ctxt;
3277         int rc;
3278
3279         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3280         if (ctxt) {
3281                 if (obd->u.cli.cl_conn_count == 1) {
3282                         /* Flush any remaining cancel messages out to the
3283                          * target */
3284                         llog_sync(ctxt, exp, 0);
3285                 }
3286                 llog_ctxt_put(ctxt);
3287         } else {
3288                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3289                        obd);
3290         }
3291
3292         rc = client_disconnect_export(exp);
3293         /**
3294          * Initially we put del_shrink_grant before disconnect_export, but it
3295          * causes the following problem if setup (connect) and cleanup
3296          * (disconnect) are tangled together.
3297          *      connect p1                     disconnect p2
3298          *   ptlrpc_connect_import
3299          *     ...............               class_manual_cleanup
3300          *                                     osc_disconnect
3301          *                                     del_shrink_grant
3302          *   ptlrpc_connect_interrupt
3303          *     init_grant_shrink
3304          *   add this client to shrink list
3305          *                                      cleanup_osc
3306          * Bang! pinger trigger the shrink.
3307          * So the osc should be disconnected from the shrink list, after we
3308          * are sure the import has been destroyed. BUG18662
3309          */
3310         if (obd->u.cli.cl_import == NULL)
3311                 osc_del_shrink_grant(&obd->u.cli);
3312         return rc;
3313 }
3314
3315 static int osc_import_event(struct obd_device *obd,
3316                             struct obd_import *imp,
3317                             enum obd_import_event event)
3318 {
3319         struct client_obd *cli;
3320         int rc = 0;
3321
3322         ENTRY;
3323         LASSERT(imp->imp_obd == obd);
3324
3325         switch (event) {
3326         case IMP_EVENT_DISCON: {
3327                 cli = &obd->u.cli;
3328                 client_obd_list_lock(&cli->cl_loi_list_lock);
3329                 cli->cl_avail_grant = 0;
3330                 cli->cl_lost_grant = 0;
3331                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3332                 break;
3333         }
3334         case IMP_EVENT_INACTIVE: {
3335                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3336                 break;
3337         }
3338         case IMP_EVENT_INVALIDATE: {
3339                 struct ldlm_namespace *ns = obd->obd_namespace;
3340                 struct lu_env         *env;
3341                 int                    refcheck;
3342
3343                 env = cl_env_get(&refcheck);
3344                 if (!IS_ERR(env)) {
3345                         /* Reset grants */
3346                         cli = &obd->u.cli;
3347                         /* all pages go to failing rpcs due to the invalid
3348                          * import */
3349                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3350
3351                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3352                         cl_env_put(env, &refcheck);
3353                 } else
3354                         rc = PTR_ERR(env);
3355                 break;
3356         }
3357         case IMP_EVENT_ACTIVE: {
3358                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3359                 break;
3360         }
3361         case IMP_EVENT_OCD: {
3362                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3363
3364                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3365                         osc_init_grant(&obd->u.cli, ocd);
3366
3367                 /* See bug 7198 */
3368                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3369                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3370
3371                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3372                 break;
3373         }
3374         case IMP_EVENT_DEACTIVATE: {
3375                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3376                 break;
3377         }
3378         case IMP_EVENT_ACTIVATE: {
3379                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3380                 break;
3381         }
3382         default:
3383                 CERROR("Unknown import event %d\n", event);
3384                 LBUG();
3385         }
3386         RETURN(rc);
3387 }
3388
3389 /**
3390  * Determine whether the lock can be canceled before replaying the lock
3391  * during recovery, see bug16774 for detailed information.
3392  *
3393  * \retval zero the lock can't be canceled
3394  * \retval other ok to cancel
3395  */
3396 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3397 {
3398         check_res_locked(lock->l_resource);
3399
3400         /*
3401          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3402          *
3403          * XXX as a future improvement, we can also cancel unused write lock
3404          * if it doesn't have dirty data and active mmaps.
3405          */
3406         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3407             (lock->l_granted_mode == LCK_PR ||
3408              lock->l_granted_mode == LCK_CR) &&
3409             (osc_dlm_lock_pageref(lock) == 0))
3410                 RETURN(1);
3411
3412         RETURN(0);
3413 }
3414
3415 static int brw_queue_work(const struct lu_env *env, void *data)
3416 {
3417         struct client_obd *cli = data;
3418
3419         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3420
3421         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3422         RETURN(0);
3423 }
3424
3425 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3426 {
3427         struct lprocfs_static_vars lvars = { 0 };
3428         struct client_obd          *cli = &obd->u.cli;
3429         void                       *handler;
3430         int                        rc;
3431         ENTRY;
3432
3433         rc = ptlrpcd_addref();
3434         if (rc)
3435                 RETURN(rc);
3436
3437         rc = client_obd_setup(obd, lcfg);
3438         if (rc)
3439                 GOTO(out_ptlrpcd, rc);
3440
3441         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3442         if (IS_ERR(handler))
3443                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3444         cli->cl_writeback_work = handler;
3445
3446         rc = osc_quota_setup(obd);
3447         if (rc)
3448                 GOTO(out_ptlrpcd_work, rc);
3449
3450         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3451         lprocfs_osc_init_vars(&lvars);
3452         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3453                 lproc_osc_attach_seqstat(obd);
3454                 sptlrpc_lprocfs_cliobd_attach(obd);
3455                 ptlrpc_lprocfs_register_obd(obd);
3456         }
3457
3458         /* We need to allocate a few requests more, because
3459          * brw_interpret tries to create new requests before freeing
3460          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3461          * reserved, but I'm afraid that might be too much wasted RAM
3462          * in fact, so 2 is just my guess and still should work. */
3463         cli->cl_import->imp_rq_pool =
3464                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3465                                     OST_MAXREQSIZE,
3466                                     ptlrpc_add_rqs_to_pool);
3467
3468         CFS_INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3469         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3470         RETURN(rc);
3471
3472 out_ptlrpcd_work:
3473         ptlrpcd_destroy_work(handler);
3474 out_client_setup:
3475         client_obd_cleanup(obd);
3476 out_ptlrpcd:
3477         ptlrpcd_decref();
3478         RETURN(rc);
3479 }
3480
3481 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3482 {
3483         int rc = 0;
3484         ENTRY;
3485
3486         switch (stage) {
3487         case OBD_CLEANUP_EARLY: {
3488                 struct obd_import *imp;
3489                 imp = obd->u.cli.cl_import;
3490                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3491                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3492                 ptlrpc_deactivate_import(imp);
3493                 spin_lock(&imp->imp_lock);
3494                 imp->imp_pingable = 0;
3495                 spin_unlock(&imp->imp_lock);
3496                 break;
3497         }
3498         case OBD_CLEANUP_EXPORTS: {
3499                 struct client_obd *cli = &obd->u.cli;
3500                 /* LU-464
3501                  * for echo client, export may be on zombie list, wait for
3502                  * zombie thread to cull it, because cli.cl_import will be
3503                  * cleared in client_disconnect_export():
3504                  *   class_export_destroy() -> obd_cleanup() ->
3505                  *   echo_device_free() -> echo_client_cleanup() ->
3506                  *   obd_disconnect() -> osc_disconnect() ->
3507                  *   client_disconnect_export()
3508                  */
3509                 obd_zombie_barrier();
3510                 if (cli->cl_writeback_work) {
3511                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3512                         cli->cl_writeback_work = NULL;
3513                 }
3514                 obd_cleanup_client_import(obd);
3515                 ptlrpc_lprocfs_unregister_obd(obd);
3516                 lprocfs_obd_cleanup(obd);
3517                 rc = obd_llog_finish(obd, 0);
3518                 if (rc != 0)
3519                         CERROR("failed to cleanup llogging subsystems\n");
3520                 break;
3521                 }
3522         }
3523         RETURN(rc);
3524 }
3525
3526 int osc_cleanup(struct obd_device *obd)
3527 {
3528         struct client_obd *cli = &obd->u.cli;
3529         int rc;
3530
3531         ENTRY;
3532
3533         /* lru cleanup */
3534         if (cli->cl_cache != NULL) {
3535                 LASSERT(cfs_atomic_read(&cli->cl_cache->ccc_users) > 0);
3536                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3537                 cfs_list_del_init(&cli->cl_lru_osc);
3538                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3539                 cli->cl_lru_left = NULL;
3540                 cfs_atomic_dec(&cli->cl_cache->ccc_users);
3541                 cli->cl_cache = NULL;
3542         }
3543
3544         /* free memory of osc quota cache */
3545         osc_quota_cleanup(obd);
3546
3547         rc = client_obd_cleanup(obd);
3548
3549         ptlrpcd_decref();
3550         RETURN(rc);
3551 }
3552
3553 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3554 {
3555         struct lprocfs_static_vars lvars = { 0 };
3556         int rc = 0;
3557
3558         lprocfs_osc_init_vars(&lvars);
3559
3560         switch (lcfg->lcfg_command) {
3561         default:
3562                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3563                                               lcfg, obd);
3564                 if (rc > 0)
3565                         rc = 0;
3566                 break;
3567         }
3568
3569         return(rc);
3570 }
3571
3572 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3573 {
3574         return osc_process_config_base(obd, buf);
3575 }
3576
3577 struct obd_ops osc_obd_ops = {
3578         .o_owner                = THIS_MODULE,
3579         .o_setup                = osc_setup,
3580         .o_precleanup           = osc_precleanup,
3581         .o_cleanup              = osc_cleanup,
3582         .o_add_conn             = client_import_add_conn,
3583         .o_del_conn             = client_import_del_conn,
3584         .o_connect              = client_connect_import,
3585         .o_reconnect            = osc_reconnect,
3586         .o_disconnect           = osc_disconnect,
3587         .o_statfs               = osc_statfs,
3588         .o_statfs_async         = osc_statfs_async,
3589         .o_packmd               = osc_packmd,
3590         .o_unpackmd             = osc_unpackmd,
3591         .o_create               = osc_create,
3592         .o_destroy              = osc_destroy,
3593         .o_getattr              = osc_getattr,
3594         .o_getattr_async        = osc_getattr_async,
3595         .o_setattr              = osc_setattr,
3596         .o_setattr_async        = osc_setattr_async,
3597         .o_brw                  = osc_brw,
3598         .o_punch                = osc_punch,
3599         .o_sync                 = osc_sync,
3600         .o_enqueue              = osc_enqueue,
3601         .o_change_cbdata        = osc_change_cbdata,
3602         .o_find_cbdata          = osc_find_cbdata,
3603         .o_cancel               = osc_cancel,
3604         .o_cancel_unused        = osc_cancel_unused,
3605         .o_iocontrol            = osc_iocontrol,
3606         .o_get_info             = osc_get_info,
3607         .o_set_info_async       = osc_set_info_async,
3608         .o_import_event         = osc_import_event,
3609         .o_llog_init            = osc_llog_init,
3610         .o_llog_finish          = osc_llog_finish,
3611         .o_process_config       = osc_process_config,
3612         .o_quotactl             = osc_quotactl,
3613         .o_quotacheck           = osc_quotacheck,
3614 };
3615
3616 extern struct lu_kmem_descr osc_caches[];
3617 extern spinlock_t osc_ast_guard;
3618 extern struct lock_class_key osc_ast_guard_class;
3619
3620 int __init osc_init(void)
3621 {
3622         struct lprocfs_static_vars lvars = { 0 };
3623         int rc;
3624         ENTRY;
3625
3626         /* print an address of _any_ initialized kernel symbol from this
3627          * module, to allow debugging with gdb that doesn't support data
3628          * symbols from modules.*/
3629         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3630
3631         rc = lu_kmem_init(osc_caches);
3632
3633         lprocfs_osc_init_vars(&lvars);
3634
3635         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3636                                  LUSTRE_OSC_NAME, &osc_device_type);
3637         if (rc) {
3638                 lu_kmem_fini(osc_caches);
3639                 RETURN(rc);
3640         }
3641
3642         spin_lock_init(&osc_ast_guard);
3643         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3644
3645         RETURN(rc);
3646 }
3647
3648 #ifdef __KERNEL__
3649 static void /*__exit*/ osc_exit(void)
3650 {
3651         class_unregister_type(LUSTRE_OSC_NAME);
3652         lu_kmem_fini(osc_caches);
3653 }
3654
3655 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3656 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3657 MODULE_LICENSE("GPL");
3658
3659 cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);
3660 #endif