Whamcloud - gitweb
ec5675952b2ac1e73918f6d140ff921c85d18ec6
[fs/lustre-release.git] / lustre / ost / ost_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2001, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ost/ost_handler.c
37  *
38  * Author: Peter J. Braam <braam@clusterfs.com>
39  * Author: Phil Schwan <phil@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_OST
43
44 #include <linux/module.h>
45 #include <obd_cksum.h>
46 #include <obd_ost.h>
47 #include <lustre_net.h>
48 #include <lustre_dlm.h>
49 #include <lustre_export.h>
50 #include <lustre_debug.h>
51 #include <linux/init.h>
52 #include <lprocfs_status.h>
53 #include <libcfs/list.h>
54 #include <lustre_quota.h>
55 #include "ost_internal.h"
56
57 static int oss_num_threads;
58 CFS_MODULE_PARM(oss_num_threads, "i", int, 0444,
59                 "number of OSS service threads to start");
60
61 static int ost_num_threads;
62 CFS_MODULE_PARM(ost_num_threads, "i", int, 0444,
63                 "number of OST service threads to start (deprecated)");
64
65 static int oss_num_create_threads;
66 CFS_MODULE_PARM(oss_num_create_threads, "i", int, 0444,
67                 "number of OSS create threads to start");
68
69 static char *oss_cpts;
70 CFS_MODULE_PARM(oss_cpts, "s", charp, 0444,
71                 "CPU partitions OSS threads should run on");
72
73 static char *oss_io_cpts;
74 CFS_MODULE_PARM(oss_io_cpts, "s", charp, 0444,
75                 "CPU partitions OSS IO threads should run on");
76
77 /**
78  * Do not return server-side uid/gid to remote client
79  */
80 static void ost_drop_id(struct obd_export *exp, struct obdo *oa)
81 {
82         if (exp_connect_rmtclient(exp)) {
83                 oa->o_uid = -1;
84                 oa->o_gid = -1;
85                 oa->o_valid &= ~(OBD_MD_FLUID | OBD_MD_FLGID);
86         }
87 }
88
89 /**
90  * Validate oa from client.
91  * If the request comes from 2.0 clients, currently only RSVD seq and IDIF
92  * req are valid.
93  *    a. for single MDS  seq = FID_SEQ_OST_MDT0,
94  *    b. for CMD, seq = FID_SEQ_OST_MDT0, FID_SEQ_OST_MDT1 - FID_SEQ_OST_MAX
95  */
96 static int ost_validate_obdo(struct obd_export *exp, struct obdo *oa,
97                              struct obd_ioobj *ioobj)
98 {
99         if (oa != NULL && !(oa->o_valid & OBD_MD_FLGROUP)) {
100                 oa->o_seq = FID_SEQ_OST_MDT0;
101                 if (ioobj)
102                         ioobj->ioo_seq = FID_SEQ_OST_MDT0;
103         /* remove fid_seq_is_rsvd() after FID-on-OST allows SEQ > 9 */
104         } else if (oa == NULL || !(fid_seq_is_rsvd(oa->o_seq) ||
105                                    fid_seq_is_mdt0(oa->o_seq))) {
106                 CERROR("%s: client %s sent invalid object "POSTID"\n",
107                        exp->exp_obd->obd_name, obd_export_nid2str(exp),
108                        oa ? oa->o_id : -1, oa ? oa->o_seq : -1);
109                 return -EPROTO;
110         }
111         obdo_from_ostid(oa, &oa->o_oi);
112         if (ioobj)
113                 ioobj_from_obdo(ioobj, oa);
114         return 0;
115 }
116
117 void oti_to_request(struct obd_trans_info *oti, struct ptlrpc_request *req)
118 {
119         struct oti_req_ack_lock *ack_lock;
120         int i;
121
122         if (oti == NULL)
123                 return;
124
125         if (req->rq_repmsg) {
126                 __u64 versions[PTLRPC_NUM_VERSIONS] = { 0 };
127                 lustre_msg_set_transno(req->rq_repmsg, oti->oti_transno);
128                 versions[0] = oti->oti_pre_version;
129                 lustre_msg_set_versions(req->rq_repmsg, versions);
130         }
131         req->rq_transno = oti->oti_transno;
132
133         /* XXX 4 == entries in oti_ack_locks??? */
134         for (ack_lock = oti->oti_ack_locks, i = 0; i < 4; i++, ack_lock++) {
135                 if (!ack_lock->mode)
136                         break;
137                 /* XXX not even calling target_send_reply in some cases... */
138                 ptlrpc_save_lock (req, &ack_lock->lock, ack_lock->mode, 0);
139         }
140 }
141
142 static int ost_destroy(struct obd_export *exp, struct ptlrpc_request *req,
143                        struct obd_trans_info *oti)
144 {
145         struct ost_body *body, *repbody;
146         struct lustre_capa *capa = NULL;
147         int rc;
148         ENTRY;
149
150         /* Get the request body */
151         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
152         if (body == NULL)
153                 RETURN(-EFAULT);
154
155         if (body->oa.o_id == 0)
156                 RETURN(-EPROTO);
157
158         rc = ost_validate_obdo(exp, &body->oa, NULL);
159         if (rc)
160                 RETURN(rc);
161
162         /* If there's a DLM request, cancel the locks mentioned in it*/
163         if (req_capsule_field_present(&req->rq_pill, &RMF_DLM_REQ, RCL_CLIENT)) {
164                 struct ldlm_request *dlm;
165
166                 dlm = req_capsule_client_get(&req->rq_pill, &RMF_DLM_REQ);
167                 if (dlm == NULL)
168                         RETURN (-EFAULT);
169                 ldlm_request_cancel(req, dlm, 0);
170         }
171
172         /* If there's a capability, get it */
173         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
174                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
175                 if (capa == NULL) {
176                         CERROR("Missing capability for OST DESTROY");
177                         RETURN (-EFAULT);
178                 }
179         }
180
181         /* Prepare the reply */
182         rc = req_capsule_server_pack(&req->rq_pill);
183         if (rc)
184                 RETURN(rc);
185
186         /* Get the log cancellation cookie */
187         if (body->oa.o_valid & OBD_MD_FLCOOKIE)
188                 oti->oti_logcookies = &body->oa.o_lcookie;
189
190         /* Finish the reply */
191         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
192         memcpy(&repbody->oa, &body->oa, sizeof(body->oa));
193
194         /* Do the destroy and set the reply status accordingly  */
195         req->rq_status = obd_destroy(req->rq_svc_thread->t_env, exp,
196                                      &repbody->oa, NULL, oti, NULL, capa);
197         RETURN(0);
198 }
199
200 /**
201  * Helper function for getting server side [start, start+count] DLM lock
202  * if asked by client.
203  */
204 static int ost_lock_get(struct obd_export *exp, struct obdo *oa,
205                         __u64 start, __u64 count, struct lustre_handle *lh,
206                         int mode, int flags)
207 {
208         struct ldlm_res_id res_id;
209         ldlm_policy_data_t policy;
210         __u64 end = start + count;
211
212         ENTRY;
213
214         LASSERT(!lustre_handle_is_used(lh));
215         /* o_id and o_gr are used for localizing resource, if client miss to set
216          * them, do not trigger ASSERTION. */
217         if (unlikely((oa->o_valid & (OBD_MD_FLID | OBD_MD_FLGROUP)) !=
218                      (OBD_MD_FLID | OBD_MD_FLGROUP)))
219                 RETURN(-EPROTO);
220
221         if (!(oa->o_valid & OBD_MD_FLFLAGS) ||
222             !(oa->o_flags & OBD_FL_SRVLOCK))
223                 RETURN(0);
224
225         osc_build_res_name(oa->o_id, oa->o_seq, &res_id);
226         CDEBUG(D_INODE, "OST-side extent lock.\n");
227
228         policy.l_extent.start = start & CFS_PAGE_MASK;
229
230         /* If ->o_blocks is EOF it means "lock till the end of the
231          * file". Otherwise, it's size of a hole being punched (in bytes) */
232         if (count == OBD_OBJECT_EOF || end < start)
233                 policy.l_extent.end = OBD_OBJECT_EOF;
234         else
235                 policy.l_extent.end = end | ~CFS_PAGE_MASK;
236
237         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
238                                       LDLM_EXTENT, &policy, mode, &flags,
239                                       ldlm_blocking_ast, ldlm_completion_ast,
240                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
241 }
242
243 /* Helper function: release lock, if any. */
244 static void ost_lock_put(struct obd_export *exp,
245                          struct lustre_handle *lh, int mode)
246 {
247         ENTRY;
248         if (lustre_handle_is_used(lh))
249                 ldlm_lock_decref(lh, mode);
250         EXIT;
251 }
252
253 static int ost_getattr(struct obd_export *exp, struct ptlrpc_request *req)
254 {
255         struct ost_body *body, *repbody;
256         struct obd_info *oinfo;
257         struct lustre_handle lh = { 0 };
258         struct lustre_capa *capa = NULL;
259         int rc;
260         ENTRY;
261
262         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
263         if (body == NULL)
264                 RETURN(-EFAULT);
265
266         rc = ost_validate_obdo(exp, &body->oa, NULL);
267         if (rc)
268                 RETURN(rc);
269
270         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
271                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
272                 if (capa == NULL) {
273                         CERROR("Missing capability for OST GETATTR");
274                         RETURN(-EFAULT);
275                 }
276         }
277
278         rc = req_capsule_server_pack(&req->rq_pill);
279         if (rc)
280                 RETURN(rc);
281
282         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
283         repbody->oa = body->oa;
284
285         rc = ost_lock_get(exp, &repbody->oa, 0, OBD_OBJECT_EOF, &lh, LCK_PR, 0);
286         if (rc)
287                 RETURN(rc);
288
289         OBD_ALLOC_PTR(oinfo);
290         if (!oinfo)
291                 GOTO(unlock, rc = -ENOMEM);
292         oinfo->oi_oa = &repbody->oa;
293         oinfo->oi_capa = capa;
294
295         req->rq_status = obd_getattr(req->rq_svc_thread->t_env, exp, oinfo);
296
297         OBD_FREE_PTR(oinfo);
298
299         ost_drop_id(exp, &repbody->oa);
300
301 unlock:
302         ost_lock_put(exp, &lh, LCK_PR);
303         RETURN(rc);
304 }
305
306 static int ost_statfs(struct ptlrpc_request *req)
307 {
308         struct obd_statfs *osfs;
309         int rc;
310         ENTRY;
311
312         rc = req_capsule_server_pack(&req->rq_pill);
313         if (rc)
314                 RETURN(rc);
315
316         osfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
317
318         req->rq_status = obd_statfs(req->rq_svc_thread->t_env, req->rq_export,
319                                     osfs,
320                                     cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS),
321                                     0);
322         if (req->rq_status != 0)
323                 CERROR("ost: statfs failed: rc %d\n", req->rq_status);
324
325         RETURN(0);
326 }
327
328 static int ost_create(struct obd_export *exp, struct ptlrpc_request *req,
329                       struct obd_trans_info *oti)
330 {
331         struct ost_body *body, *repbody;
332         int rc;
333         ENTRY;
334
335         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
336         if (body == NULL)
337                 RETURN(-EFAULT);
338
339         rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
340         if (rc)
341                 RETURN(rc);
342
343         rc = req_capsule_server_pack(&req->rq_pill);
344         if (rc)
345                 RETURN(rc);
346
347         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
348         repbody->oa = body->oa;
349         oti->oti_logcookies = &body->oa.o_lcookie;
350
351         req->rq_status = obd_create(req->rq_svc_thread->t_env, exp,
352                                     &repbody->oa, NULL, oti);
353         //obd_log_cancel(conn, NULL, 1, oti->oti_logcookies, 0);
354         RETURN(0);
355 }
356
357 static int ost_punch(struct obd_export *exp, struct ptlrpc_request *req,
358                      struct obd_trans_info *oti)
359 {
360         struct ost_body *body, *repbody;
361         int rc, flags = 0;
362         struct lustre_handle lh = {0,};
363         ENTRY;
364
365         /* check that we do support OBD_CONNECT_TRUNCLOCK. */
366         CLASSERT(OST_CONNECT_SUPPORTED & OBD_CONNECT_TRUNCLOCK);
367
368         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
369         if (body == NULL)
370                 RETURN(-EFAULT);
371
372         rc = ost_validate_obdo(exp, &body->oa, NULL);
373         if (rc)
374                 RETURN(rc);
375
376         if ((body->oa.o_valid & (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS)) !=
377             (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS))
378                 RETURN(-EPROTO);
379
380         rc = req_capsule_server_pack(&req->rq_pill);
381         if (rc)
382                 RETURN(rc);
383
384         /* standard truncate optimization: if file body is completely
385          * destroyed, don't send data back to the server. */
386         if (body->oa.o_size == 0)
387                 flags |= LDLM_AST_DISCARD_DATA;
388
389         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
390         repbody->oa = body->oa;
391
392         rc = ost_lock_get(exp, &repbody->oa, repbody->oa.o_size,
393                           repbody->oa.o_blocks, &lh, LCK_PW, flags);
394         if (rc == 0) {
395                 struct obd_info *oinfo;
396                 struct lustre_capa *capa = NULL;
397
398                 if (repbody->oa.o_valid & OBD_MD_FLFLAGS &&
399                     repbody->oa.o_flags == OBD_FL_SRVLOCK)
400                         /*
401                          * If OBD_FL_SRVLOCK is the only bit set in
402                          * ->o_flags, clear OBD_MD_FLFLAGS to avoid falling
403                          * through filter_setattr() to filter_iocontrol().
404                          */
405                         repbody->oa.o_valid &= ~OBD_MD_FLFLAGS;
406
407                 if (repbody->oa.o_valid & OBD_MD_FLOSSCAPA) {
408                         capa = req_capsule_client_get(&req->rq_pill,
409                                                       &RMF_CAPA1);
410                         if (capa == NULL) {
411                                 CERROR("Missing capability for OST PUNCH");
412                                 GOTO(unlock, rc = -EFAULT);
413                         }
414                 }
415
416                 OBD_ALLOC_PTR(oinfo);
417                 if (!oinfo)
418                         GOTO(unlock, rc = -ENOMEM);
419                 oinfo->oi_oa = &repbody->oa;
420                 oinfo->oi_policy.l_extent.start = oinfo->oi_oa->o_size;
421                 oinfo->oi_policy.l_extent.end = oinfo->oi_oa->o_blocks;
422                 oinfo->oi_capa = capa;
423                 oinfo->oi_flags = OBD_FL_PUNCH;
424
425                 req->rq_status = obd_punch(req->rq_svc_thread->t_env, exp,
426                                            oinfo, oti, NULL);
427                 OBD_FREE_PTR(oinfo);
428 unlock:
429                 ost_lock_put(exp, &lh, LCK_PW);
430         }
431
432         ost_drop_id(exp, &repbody->oa);
433         RETURN(rc);
434 }
435
436 static int ost_sync(struct obd_export *exp, struct ptlrpc_request *req,
437                     struct obd_trans_info *oti)
438 {
439         struct ost_body *body, *repbody;
440         struct obd_info *oinfo;
441         struct lustre_capa *capa = NULL;
442         int rc;
443         ENTRY;
444
445         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
446         if (body == NULL)
447                 RETURN(-EFAULT);
448
449         rc = ost_validate_obdo(exp, &body->oa, NULL);
450         if (rc)
451                 RETURN(rc);
452
453         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
454                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
455                 if (capa == NULL) {
456                         CERROR("Missing capability for OST SYNC");
457                         RETURN (-EFAULT);
458                 }
459         }
460
461         rc = req_capsule_server_pack(&req->rq_pill);
462         if (rc)
463                 RETURN(rc);
464
465         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
466         repbody->oa = body->oa;
467
468         OBD_ALLOC_PTR(oinfo);
469         if (!oinfo)
470                 RETURN(-ENOMEM);
471
472         oinfo->oi_oa = &repbody->oa;
473         oinfo->oi_capa = capa;
474         oinfo->oi_jobid = oti->oti_jobid;
475         req->rq_status = obd_sync(req->rq_svc_thread->t_env, exp, oinfo,
476                                   repbody->oa.o_size, repbody->oa.o_blocks,
477                                   NULL);
478         OBD_FREE_PTR(oinfo);
479
480         ost_drop_id(exp, &repbody->oa);
481         RETURN(0);
482 }
483
484 static int ost_setattr(struct obd_export *exp, struct ptlrpc_request *req,
485                        struct obd_trans_info *oti)
486 {
487         struct ost_body *body, *repbody;
488         struct obd_info *oinfo;
489         struct lustre_capa *capa = NULL;
490         int rc;
491         ENTRY;
492
493         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
494         if (body == NULL)
495                 RETURN(-EFAULT);
496
497         rc = ost_validate_obdo(req->rq_export, &body->oa, NULL);
498         if (rc)
499                 RETURN(rc);
500
501         rc = req_capsule_server_pack(&req->rq_pill);
502         if (rc)
503                 RETURN(rc);
504
505         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
506                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
507                 if (capa == NULL) {
508                         CERROR("Missing capability for OST SETATTR");
509                         RETURN (-EFAULT);
510                 }
511         }
512
513         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
514         repbody->oa = body->oa;
515
516         OBD_ALLOC_PTR(oinfo);
517         if (!oinfo)
518                 RETURN(-ENOMEM);
519         oinfo->oi_oa = &repbody->oa;
520         oinfo->oi_capa = capa;
521
522         req->rq_status = obd_setattr(req->rq_svc_thread->t_env, exp, oinfo,
523                                      oti);
524
525         OBD_FREE_PTR(oinfo);
526
527         ost_drop_id(exp, &repbody->oa);
528         RETURN(0);
529 }
530
531 static __u32 ost_checksum_bulk(struct ptlrpc_bulk_desc *desc, int opc,
532                                cksum_type_t cksum_type)
533 {
534         struct cfs_crypto_hash_desc     *hdesc;
535         unsigned int                    bufsize;
536         int                             i, err;
537         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
538         __u32                           cksum;
539
540         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
541         if (IS_ERR(hdesc)) {
542                 CERROR("Unable to initialize checksum hash %s\n",
543                        cfs_crypto_hash_name(cfs_alg));
544                 return PTR_ERR(hdesc);
545         }
546         CDEBUG(D_INFO, "Checksum for algo %s\n", cfs_crypto_hash_name(cfs_alg));
547         for (i = 0; i < desc->bd_iov_count; i++) {
548
549                 /* corrupt the data before we compute the checksum, to
550                  * simulate a client->OST data error */
551                 if (i == 0 && opc == OST_WRITE &&
552                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_RECEIVE)) {
553                         int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
554                         int len = desc->bd_iov[i].kiov_len;
555                         char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
556                         memcpy(ptr, "bad3", min(4, len));
557                         kunmap(desc->bd_iov[i].kiov_page);
558                 }
559                 cfs_crypto_hash_update_page(hdesc, desc->bd_iov[i].kiov_page,
560                                   desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK,
561                                   desc->bd_iov[i].kiov_len);
562
563                  /* corrupt the data after we compute the checksum, to
564                  * simulate an OST->client data error */
565                 if (i == 0 && opc == OST_READ &&
566                     OBD_FAIL_CHECK(OBD_FAIL_OST_CHECKSUM_SEND)) {
567                         int off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
568                         int len = desc->bd_iov[i].kiov_len;
569                         char *ptr = kmap(desc->bd_iov[i].kiov_page) + off;
570                         memcpy(ptr, "bad4", min(4, len));
571                         kunmap(desc->bd_iov[i].kiov_page);
572                         /* nobody should use corrupted page again */
573                         ClearPageUptodate(desc->bd_iov[i].kiov_page);
574                 }
575         }
576
577         bufsize = 4;
578         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
579         if (err)
580                 cfs_crypto_hash_final(hdesc, NULL, NULL);
581
582         return cksum;
583 }
584
585 static int ost_brw_lock_get(int mode, struct obd_export *exp,
586                             struct obd_ioobj *obj, struct niobuf_remote *nb,
587                             struct lustre_handle *lh)
588 {
589         int flags                 = 0;
590         int nrbufs                = obj->ioo_bufcnt;
591         struct ldlm_res_id res_id;
592         ldlm_policy_data_t policy;
593         int i;
594         ENTRY;
595
596         osc_build_res_name(obj->ioo_id, obj->ioo_seq, &res_id);
597         LASSERT(mode == LCK_PR || mode == LCK_PW);
598         LASSERT(!lustre_handle_is_used(lh));
599
600         if (nrbufs == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
601                 RETURN(0);
602
603         for (i = 1; i < nrbufs; i ++)
604                 if ((nb[0].flags & OBD_BRW_SRVLOCK) !=
605                     (nb[i].flags & OBD_BRW_SRVLOCK))
606                         RETURN(-EFAULT);
607
608         policy.l_extent.start = nb[0].offset & CFS_PAGE_MASK;
609         policy.l_extent.end   = (nb[nrbufs - 1].offset +
610                                  nb[nrbufs - 1].len - 1) | ~CFS_PAGE_MASK;
611
612         RETURN(ldlm_cli_enqueue_local(exp->exp_obd->obd_namespace, &res_id,
613                                       LDLM_EXTENT, &policy, mode, &flags,
614                                       ldlm_blocking_ast, ldlm_completion_ast,
615                                       ldlm_glimpse_ast, NULL, 0, NULL, lh));
616 }
617
618 static void ost_brw_lock_put(int mode,
619                              struct obd_ioobj *obj, struct niobuf_remote *niob,
620                              struct lustre_handle *lh)
621 {
622         ENTRY;
623         LASSERT(mode == LCK_PR || mode == LCK_PW);
624         LASSERT((obj->ioo_bufcnt > 0 && (niob[0].flags & OBD_BRW_SRVLOCK)) ==
625                 lustre_handle_is_used(lh));
626         if (lustre_handle_is_used(lh))
627                 ldlm_lock_decref(lh, mode);
628         EXIT;
629 }
630
631 /* Allocate thread local buffers if needed */
632 static struct ost_thread_local_cache *ost_tls_get(struct ptlrpc_request *r)
633 {
634         struct ost_thread_local_cache *tls =
635                 (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
636
637         /* In normal mode of operation an I/O request is serviced only
638          * by ll_ost_io threads each of them has own tls buffers allocated by
639          * ost_thread_init().
640          * During recovery, an I/O request may be queued until any of the ost
641          * service threads process it. Not necessary it should be one of
642          * ll_ost_io threads. In that case we dynamically allocating tls
643          * buffers for the request service time. */
644         if (unlikely(tls == NULL)) {
645                 LASSERT(r->rq_export->exp_in_recovery);
646                 OBD_ALLOC_PTR(tls);
647                 if (tls != NULL) {
648                         tls->temporary = 1;
649                         r->rq_svc_thread->t_data = tls;
650                 }
651         }
652         return  tls;
653 }
654
655 /* Free thread local buffers if they were allocated only for servicing
656  * this one request */
657 static void ost_tls_put(struct ptlrpc_request *r)
658 {
659         struct ost_thread_local_cache *tls =
660                 (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
661
662         if (unlikely(tls->temporary)) {
663                 OBD_FREE_PTR(tls);
664                 r->rq_svc_thread->t_data = NULL;
665         }
666 }
667
668 static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
669 {
670         struct ptlrpc_bulk_desc *desc = NULL;
671         struct obd_export *exp = req->rq_export;
672         struct niobuf_remote *remote_nb;
673         struct niobuf_local *local_nb;
674         struct obd_ioobj *ioo;
675         struct ost_body *body, *repbody;
676         struct lustre_capa *capa = NULL;
677         struct l_wait_info lwi;
678         struct lustre_handle lockh = { 0 };
679         int niocount, npages, nob = 0, rc, i;
680         int no_reply = 0;
681         struct ost_thread_local_cache *tls;
682         ENTRY;
683
684         req->rq_bulk_read = 1;
685
686         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_READ_BULK))
687                 GOTO(out, rc = -EIO);
688
689         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
690
691         /* Check if there is eviction in progress, and if so, wait for it to
692          * finish */
693         if (unlikely(cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress))) {
694                 lwi = LWI_INTR(NULL, NULL); // We do not care how long it takes
695                 rc = l_wait_event(exp->exp_obd->obd_evict_inprogress_waitq,
696                         !cfs_atomic_read(&exp->exp_obd->obd_evict_inprogress),
697                         &lwi);
698         }
699         if (exp->exp_failed)
700                 GOTO(out, rc = -ENOTCONN);
701
702         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
703          * ost_rw_hpreq_check(). */
704         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
705         if (body == NULL)
706                 GOTO(out, rc = -EFAULT);
707
708         /*
709          * A req_capsule_X_get_array(pill, field, ptr_to_element_count) function
710          * would be useful here and wherever we get &RMF_OBD_IOOBJ and
711          * &RMF_NIOBUF_REMOTE.
712          */
713         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
714         if (ioo == NULL)
715                 GOTO(out, rc = -EFAULT);
716
717         rc = ost_validate_obdo(exp, &body->oa, ioo);
718         if (rc)
719                 RETURN(rc);
720
721         niocount = ioo->ioo_bufcnt;
722         remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
723         if (remote_nb == NULL)
724                 GOTO(out, rc = -EFAULT);
725
726         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
727                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
728                 if (capa == NULL) {
729                         CERROR("Missing capability for OST BRW READ");
730                         GOTO(out, rc = -EFAULT);
731                 }
732         }
733
734         rc = req_capsule_server_pack(&req->rq_pill);
735         if (rc)
736                 GOTO(out, rc);
737
738         tls = ost_tls_get(req);
739         if (tls == NULL)
740                 GOTO(out_bulk, rc = -ENOMEM);
741         local_nb = tls->local;
742
743         rc = ost_brw_lock_get(LCK_PR, exp, ioo, remote_nb, &lockh);
744         if (rc != 0)
745                 GOTO(out_tls, rc);
746
747         /*
748          * If getting the lock took more time than
749          * client was willing to wait, drop it. b=11330
750          */
751         if (cfs_time_current_sec() > req->rq_deadline ||
752             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
753                 no_reply = 1;
754                 CERROR("Dropping timed-out read from %s because locking"
755                        "object "LPX64" took %ld seconds (limit was %ld).\n",
756                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
757                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
758                        req->rq_deadline - req->rq_arrival_time.tv_sec);
759                 GOTO(out_lock, rc = -ETIMEDOUT);
760         }
761
762         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
763         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
764
765         npages = OST_THREAD_POOL_SIZE;
766         rc = obd_preprw(req->rq_svc_thread->t_env, OBD_BRW_READ, exp,
767                         &repbody->oa, 1, ioo, remote_nb, &npages, local_nb,
768                         oti, capa);
769         if (rc != 0)
770                 GOTO(out_lock, rc);
771
772         desc = ptlrpc_prep_bulk_exp(req, npages,
773                                      BULK_PUT_SOURCE, OST_BULK_PORTAL);
774         if (desc == NULL)
775                 GOTO(out_commitrw, rc = -ENOMEM);
776
777         nob = 0;
778         for (i = 0; i < npages; i++) {
779                 int page_rc = local_nb[i].rc;
780
781                 if (page_rc < 0) {              /* error */
782                         rc = page_rc;
783                         break;
784                 }
785
786                 nob += page_rc;
787                 if (page_rc != 0) {             /* some data! */
788                         LASSERT (local_nb[i].page != NULL);
789                         ptlrpc_prep_bulk_page(desc, local_nb[i].page,
790                                               local_nb[i].offset & ~CFS_PAGE_MASK,
791                                               page_rc);
792                 }
793
794                 if (page_rc != local_nb[i].len) { /* short read */
795                         /* All subsequent pages should be 0 */
796                         while(++i < npages)
797                                 LASSERT(local_nb[i].rc == 0);
798                         break;
799                 }
800         }
801
802         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
803                 cksum_type_t cksum_type =
804                         cksum_type_unpack(repbody->oa.o_valid & OBD_MD_FLFLAGS ?
805                                           repbody->oa.o_flags : 0);
806                 repbody->oa.o_flags = cksum_type_pack(cksum_type);
807                 repbody->oa.o_valid = OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
808                 repbody->oa.o_cksum = ost_checksum_bulk(desc, OST_READ,cksum_type);
809                 CDEBUG(D_PAGE, "checksum at read origin: %x\n",
810                        repbody->oa.o_cksum);
811         } else {
812                 repbody->oa.o_valid = 0;
813         }
814         /* We're finishing using body->oa as an input variable */
815
816         /* Check if client was evicted while we were doing i/o before touching
817            network */
818         if (rc == 0) {
819                 if (likely(!CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2)))
820                         rc = target_bulk_io(exp, desc, &lwi);
821                 no_reply = rc != 0;
822         }
823
824 out_commitrw:
825         /* Must commit after prep above in all cases */
826         rc = obd_commitrw(req->rq_svc_thread->t_env, OBD_BRW_READ, exp,
827                           &repbody->oa, 1, ioo, remote_nb, npages, local_nb,
828                           oti, rc);
829
830         if (rc == 0)
831                 ost_drop_id(exp, &repbody->oa);
832
833 out_lock:
834         ost_brw_lock_put(LCK_PR, ioo, remote_nb, &lockh);
835 out_tls:
836         ost_tls_put(req);
837 out_bulk:
838         if (desc && !CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))
839                 ptlrpc_free_bulk(desc);
840 out:
841         LASSERT(rc <= 0);
842         if (rc == 0) {
843                 req->rq_status = nob;
844                 ptlrpc_lprocfs_brw(req, nob);
845                 target_committed_to_req(req);
846                 ptlrpc_reply(req);
847         } else if (!no_reply) {
848                 /* Only reply if there was no comms problem with bulk */
849                 target_committed_to_req(req);
850                 req->rq_status = rc;
851                 ptlrpc_error(req);
852         } else {
853                 /* reply out callback would free */
854                 ptlrpc_req_drop_rs(req);
855                 LCONSOLE_WARN("%s: Bulk IO read error with %s (at %s), "
856                               "client will retry: rc %d\n",
857                               exp->exp_obd->obd_name,
858                               obd_uuid2str(&exp->exp_client_uuid),
859                               obd_export_nid2str(exp), rc);
860         }
861         /* send a bulk after reply to simulate a network delay or reordering
862          * by a router */
863         if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_CLIENT_BULK_CB2))) {
864                 cfs_waitq_t              waitq;
865                 struct l_wait_info       lwi1;
866
867                 CDEBUG(D_INFO, "reorder BULK\n");
868                 cfs_waitq_init(&waitq);
869
870                 lwi1 = LWI_TIMEOUT_INTR(cfs_time_seconds(3), NULL, NULL, NULL);
871                 l_wait_event(waitq, 0, &lwi1);
872                 rc = target_bulk_io(exp, desc, &lwi);
873                 ptlrpc_free_bulk(desc);
874         }
875
876         RETURN(rc);
877 }
878
879 static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
880 {
881         struct ptlrpc_bulk_desc *desc = NULL;
882         struct obd_export       *exp = req->rq_export;
883         struct niobuf_remote    *remote_nb;
884         struct niobuf_local     *local_nb;
885         struct obd_ioobj        *ioo;
886         struct ost_body         *body, *repbody;
887         struct l_wait_info       lwi;
888         struct lustre_handle     lockh = {0};
889         struct lustre_capa      *capa = NULL;
890         __u32                   *rcs;
891         int objcount, niocount, npages;
892         int rc, i, j;
893         obd_count                client_cksum = 0, server_cksum = 0;
894         cksum_type_t             cksum_type = OBD_CKSUM_CRC32;
895         int                      no_reply = 0, mmap = 0;
896         __u32                    o_uid = 0, o_gid = 0;
897         struct ost_thread_local_cache *tls;
898         ENTRY;
899
900         req->rq_bulk_write = 1;
901
902         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK))
903                 GOTO(out, rc = -EIO);
904         if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_WRITE_BULK2))
905                 GOTO(out, rc = -EFAULT);
906
907         /* pause before transaction has been started */
908         OBD_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_BULK, (obd_timeout + 1) / 4);
909
910         /* ost_body, ioobj & noibuf_remote are verified and swabbed in
911          * ost_rw_hpreq_check(). */
912         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
913         if (body == NULL)
914                 GOTO(out, rc = -EFAULT);
915
916         objcount = req_capsule_get_size(&req->rq_pill, &RMF_OBD_IOOBJ,
917                                         RCL_CLIENT) / sizeof(*ioo);
918         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
919         if (ioo == NULL)
920                 GOTO(out, rc = -EFAULT);
921
922         rc = ost_validate_obdo(exp, &body->oa, ioo);
923         if (rc)
924                 RETURN(rc);
925
926         for (niocount = i = 0; i < objcount; i++)
927                 niocount += ioo[i].ioo_bufcnt;
928
929         /*
930          * It'd be nice to have a capsule function to indicate how many elements
931          * there were in a buffer for an RMF that's declared to be an array.
932          * It's easy enough to compute the number of elements here though.
933          */
934         remote_nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
935         if (remote_nb == NULL || niocount != (req_capsule_get_size(&req->rq_pill,
936             &RMF_NIOBUF_REMOTE, RCL_CLIENT) / sizeof(*remote_nb)))
937                 GOTO(out, rc = -EFAULT);
938
939         if ((remote_nb[0].flags & OBD_BRW_MEMALLOC) &&
940             (exp->exp_connection->c_peer.nid == exp->exp_connection->c_self))
941                 cfs_memory_pressure_set();
942
943         if (body->oa.o_valid & OBD_MD_FLOSSCAPA) {
944                 capa = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
945                 if (capa == NULL) {
946                         CERROR("Missing capability for OST BRW WRITE");
947                         GOTO(out, rc = -EFAULT);
948                 }
949         }
950
951         req_capsule_set_size(&req->rq_pill, &RMF_RCS, RCL_SERVER,
952                              niocount * sizeof(*rcs));
953         rc = req_capsule_server_pack(&req->rq_pill);
954         if (rc != 0)
955                 GOTO(out, rc);
956         CFS_FAIL_TIMEOUT(OBD_FAIL_OST_BRW_PAUSE_PACK, cfs_fail_val);
957         rcs = req_capsule_server_get(&req->rq_pill, &RMF_RCS);
958
959         tls = ost_tls_get(req);
960         if (tls == NULL)
961                 GOTO(out_bulk, rc = -ENOMEM);
962         local_nb = tls->local;
963
964         rc = ost_brw_lock_get(LCK_PW, exp, ioo, remote_nb, &lockh);
965         if (rc != 0)
966                 GOTO(out_tls, rc);
967
968         /*
969          * If getting the lock took more time than
970          * client was willing to wait, drop it. b=11330
971          */
972         if (cfs_time_current_sec() > req->rq_deadline ||
973             OBD_FAIL_CHECK(OBD_FAIL_OST_DROP_REQ)) {
974                 no_reply = 1;
975                 CERROR("Dropping timed-out write from %s because locking "
976                        "object "LPX64" took %ld seconds (limit was %ld).\n",
977                        libcfs_id2str(req->rq_peer), ioo->ioo_id,
978                        cfs_time_current_sec() - req->rq_arrival_time.tv_sec,
979                        req->rq_deadline - req->rq_arrival_time.tv_sec);
980                 GOTO(out_lock, rc = -ETIMEDOUT);
981         }
982
983         /* obd_preprw clobbers oa->valid, so save what we need */
984         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
985                 client_cksum = body->oa.o_cksum;
986                 if (body->oa.o_valid & OBD_MD_FLFLAGS)
987                         cksum_type = cksum_type_unpack(body->oa.o_flags);
988         }
989         if (body->oa.o_valid & OBD_MD_FLFLAGS && body->oa.o_flags & OBD_FL_MMAP)
990                 mmap = 1;
991
992         /* Because we already sync grant info with client when reconnect,
993          * grant info will be cleared for resent req, then fed_grant and
994          * total_grant will not be modified in following preprw_write */
995         if (lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY)) {
996                 DEBUG_REQ(D_CACHE, req, "clear resent/replay req grant info");
997                 body->oa.o_valid &= ~OBD_MD_FLGRANT;
998         }
999
1000         if (exp_connect_rmtclient(exp)) {
1001                 o_uid = body->oa.o_uid;
1002                 o_gid = body->oa.o_gid;
1003         }
1004
1005         repbody = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1006         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
1007
1008         npages = OST_THREAD_POOL_SIZE;
1009         rc = obd_preprw(req->rq_svc_thread->t_env, OBD_BRW_WRITE, exp,
1010                         &repbody->oa, objcount, ioo, remote_nb, &npages,
1011                         local_nb, oti, capa);
1012         if (rc != 0)
1013                 GOTO(out_lock, rc);
1014
1015         desc = ptlrpc_prep_bulk_exp(req, npages,
1016                                      BULK_GET_SINK, OST_BULK_PORTAL);
1017         if (desc == NULL)
1018                 GOTO(skip_transfer, rc = -ENOMEM);
1019
1020         /* NB Having prepped, we must commit... */
1021
1022         for (i = 0; i < npages; i++)
1023                 ptlrpc_prep_bulk_page(desc, local_nb[i].page,
1024                                       local_nb[i].offset & ~CFS_PAGE_MASK,
1025                                       local_nb[i].len);
1026
1027         rc = sptlrpc_svc_prep_bulk(req, desc);
1028         if (rc != 0)
1029                 GOTO(out_lock, rc);
1030
1031         rc = target_bulk_io(exp, desc, &lwi);
1032         no_reply = rc != 0;
1033
1034 skip_transfer:
1035         if (client_cksum != 0 && rc == 0) {
1036                 static int cksum_counter;
1037                 repbody->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1038                 repbody->oa.o_flags &= ~OBD_FL_CKSUM_ALL;
1039                 repbody->oa.o_flags |= cksum_type_pack(cksum_type);
1040                 server_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1041                 repbody->oa.o_cksum = server_cksum;
1042                 cksum_counter++;
1043                 if (unlikely(client_cksum != server_cksum)) {
1044                         CDEBUG_LIMIT(mmap ? D_INFO : D_ERROR,
1045                                      "client csum %x, server csum %x\n",
1046                                      client_cksum, server_cksum);
1047                         cksum_counter = 0;
1048                 } else if ((cksum_counter & (-cksum_counter)) == cksum_counter){
1049                         CDEBUG(D_INFO, "Checksum %u from %s OK: %x\n",
1050                                cksum_counter, libcfs_id2str(req->rq_peer),
1051                                server_cksum);
1052                 }
1053         }
1054
1055         /* Must commit after prep above in all cases */
1056         rc = obd_commitrw(req->rq_svc_thread->t_env, OBD_BRW_WRITE, exp,
1057                           &repbody->oa, objcount, ioo, remote_nb, npages,
1058                           local_nb, oti, rc);
1059         if (rc == -ENOTCONN)
1060                 /* quota acquire process has been given up because
1061                  * either the client has been evicted or the client
1062                  * has timed out the request already */
1063                 no_reply = 1;
1064
1065         if (exp_connect_rmtclient(exp)) {
1066                 repbody->oa.o_uid = o_uid;
1067                 repbody->oa.o_gid = o_gid;
1068         }
1069
1070         /*
1071          * Disable sending mtime back to the client. If the client locked the
1072          * whole object, then it has already updated the mtime on its side,
1073          * otherwise it will have to glimpse anyway (see bug 21489, comment 32)
1074          */
1075         repbody->oa.o_valid &= ~(OBD_MD_FLMTIME | OBD_MD_FLATIME);
1076
1077         if (unlikely(client_cksum != server_cksum && rc == 0 && !mmap)) {
1078                 int  new_cksum = ost_checksum_bulk(desc, OST_WRITE, cksum_type);
1079                 char *msg;
1080                 char *via;
1081                 char *router;
1082
1083                 if (new_cksum == server_cksum)
1084                         msg = "changed in transit before arrival at OST";
1085                 else if (new_cksum == client_cksum)
1086                         msg = "initial checksum before message complete";
1087                 else
1088                         msg = "changed in transit AND after initial checksum";
1089
1090                 if (req->rq_peer.nid == desc->bd_sender) {
1091                         via = router = "";
1092                 } else {
1093                         via = " via ";
1094                         router = libcfs_nid2str(desc->bd_sender);
1095                 }
1096
1097                 LCONSOLE_ERROR_MSG(0x168, "%s: BAD WRITE CHECKSUM: %s from "
1098                                    "%s%s%s inode "DFID" object "
1099                                    LPU64"/"LPU64" extent ["LPU64"-"LPU64"]\n",
1100                                    exp->exp_obd->obd_name, msg,
1101                                    libcfs_id2str(req->rq_peer),
1102                                    via, router,
1103                                    body->oa.o_valid & OBD_MD_FLFID ?
1104                                                 body->oa.o_parent_seq : (__u64)0,
1105                                    body->oa.o_valid & OBD_MD_FLFID ?
1106                                                 body->oa.o_parent_oid : 0,
1107                                    body->oa.o_valid & OBD_MD_FLFID ?
1108                                                 body->oa.o_parent_ver : 0,
1109                                    body->oa.o_id,
1110                                    body->oa.o_valid & OBD_MD_FLGROUP ?
1111                                                 body->oa.o_seq : (__u64)0,
1112                                    local_nb[0].offset,
1113                                    local_nb[npages-1].offset +
1114                                    local_nb[npages-1].len - 1 );
1115                 CERROR("client csum %x, original server csum %x, "
1116                        "server csum now %x\n",
1117                        client_cksum, server_cksum, new_cksum);
1118         }
1119
1120         if (rc == 0) {
1121                 int nob = 0;
1122
1123                 /* set per-requested niobuf return codes */
1124                 for (i = j = 0; i < niocount; i++) {
1125                         int len = remote_nb[i].len;
1126
1127                         nob += len;
1128                         rcs[i] = 0;
1129                         do {
1130                                 LASSERT(j < npages);
1131                                 if (local_nb[j].rc < 0)
1132                                         rcs[i] = local_nb[j].rc;
1133                                 len -= local_nb[j].len;
1134                                 j++;
1135                         } while (len > 0);
1136                         LASSERT(len == 0);
1137                 }
1138                 LASSERT(j == npages);
1139                 ptlrpc_lprocfs_brw(req, nob);
1140         }
1141
1142 out_lock:
1143         ost_brw_lock_put(LCK_PW, ioo, remote_nb, &lockh);
1144 out_tls:
1145         ost_tls_put(req);
1146 out_bulk:
1147         if (desc)
1148                 ptlrpc_free_bulk(desc);
1149 out:
1150         if (rc == 0) {
1151                 oti_to_request(oti, req);
1152                 target_committed_to_req(req);
1153                 rc = ptlrpc_reply(req);
1154         } else if (!no_reply) {
1155                 /* Only reply if there was no comms problem with bulk */
1156                 target_committed_to_req(req);
1157                 req->rq_status = rc;
1158                 ptlrpc_error(req);
1159         } else {
1160                 /* reply out callback would free */
1161                 ptlrpc_req_drop_rs(req);
1162                 LCONSOLE_WARN("%s: Bulk IO write error with %s (at %s), "
1163                               "client will retry: rc %d\n",
1164                               exp->exp_obd->obd_name,
1165                               obd_uuid2str(&exp->exp_client_uuid),
1166                               obd_export_nid2str(exp), rc);
1167         }
1168         cfs_memory_pressure_clr();
1169         RETURN(rc);
1170 }
1171
1172 /**
1173  * Implementation of OST_SET_INFO.
1174  *
1175  * OST_SET_INFO is like ioctl(): heavily overloaded.  Specifically, it takes a
1176  * "key" and a value RPC buffers as arguments, with the value's contents
1177  * interpreted according to the key.
1178  *
1179  * Value types that need swabbing have swabbing done explicitly, either here or
1180  * in functions called from here.  This should be corrected: all swabbing should
1181  * be done in the capsule abstraction, as that will then allow us to move
1182  * swabbing exclusively to the client without having to modify server code
1183  * outside the capsule abstraction's implementation itself.  To correct this
1184  * will require minor changes to the capsule abstraction; see the comments for
1185  * req_capsule_extend() in layout.c.
1186  */
1187 static int ost_set_info(struct obd_export *exp, struct ptlrpc_request *req)
1188 {
1189         struct ost_body *body = NULL, *repbody;
1190         char *key, *val = NULL;
1191         int keylen, vallen, rc = 0;
1192         int is_grant_shrink = 0;
1193         ENTRY;
1194
1195         key = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
1196         if (key == NULL) {
1197                 DEBUG_REQ(D_HA, req, "no set_info key");
1198                 RETURN(-EFAULT);
1199         }
1200         keylen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_KEY,
1201                                       RCL_CLIENT);
1202
1203         vallen = req_capsule_get_size(&req->rq_pill, &RMF_SETINFO_VAL,
1204                                       RCL_CLIENT);
1205
1206         if ((is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK)))
1207                 /* In this case the value is actually an RMF_OST_BODY, so we
1208                  * transmutate the type of this PTLRPC */
1209                 req_capsule_extend(&req->rq_pill, &RQF_OST_SET_GRANT_INFO);
1210
1211         rc = req_capsule_server_pack(&req->rq_pill);
1212         if (rc)
1213                 RETURN(rc);
1214
1215         if (vallen) {
1216                 if (is_grant_shrink) {
1217                         body = req_capsule_client_get(&req->rq_pill,
1218                                                       &RMF_OST_BODY);
1219                         if (!body)
1220                                 RETURN(-EFAULT);
1221
1222                         repbody = req_capsule_server_get(&req->rq_pill,
1223                                                          &RMF_OST_BODY);
1224                         memcpy(repbody, body, sizeof(*body));
1225                         val = (char*)repbody;
1226                 } else {
1227                         val = req_capsule_client_get(&req->rq_pill,
1228                                                      &RMF_SETINFO_VAL);
1229                 }
1230         }
1231
1232         if (KEY_IS(KEY_EVICT_BY_NID)) {
1233                 if (val && vallen)
1234                         obd_export_evict_by_nid(exp->exp_obd, val);
1235                 GOTO(out, rc = 0);
1236         } else if (KEY_IS(KEY_MDS_CONN) && ptlrpc_req_need_swab(req)) {
1237                 if (vallen < sizeof(__u32))
1238                         RETURN(-EFAULT);
1239                 __swab32s((__u32 *)val);
1240         }
1241
1242         /* OBD will also check if KEY_IS(KEY_GRANT_SHRINK), and will cast val to
1243          * a struct ost_body * value */
1244         rc = obd_set_info_async(req->rq_svc_thread->t_env, exp, keylen,
1245                                 key, vallen, val, NULL);
1246 out:
1247         lustre_msg_set_status(req->rq_repmsg, 0);
1248         RETURN(rc);
1249 }
1250
1251 static int ost_get_info(struct obd_export *exp, struct ptlrpc_request *req)
1252 {
1253         void *key, *reply;
1254         int keylen, replylen, rc = 0;
1255         struct req_capsule *pill = &req->rq_pill;
1256         ENTRY;
1257
1258         /* this common part for get_info rpc */
1259         key = req_capsule_client_get(pill, &RMF_SETINFO_KEY);
1260         if (key == NULL) {
1261                 DEBUG_REQ(D_HA, req, "no get_info key");
1262                 RETURN(-EFAULT);
1263         }
1264         keylen = req_capsule_get_size(pill, &RMF_SETINFO_KEY, RCL_CLIENT);
1265
1266         if (KEY_IS(KEY_FIEMAP)) {
1267                 struct ll_fiemap_info_key *fm_key = key;
1268                 int rc;
1269
1270                 rc = ost_validate_obdo(exp, &fm_key->oa, NULL);
1271                 if (rc)
1272                         RETURN(rc);
1273         }
1274
1275         rc = obd_get_info(req->rq_svc_thread->t_env, exp, keylen, key,
1276                           &replylen, NULL, NULL);
1277         if (rc)
1278                 RETURN(rc);
1279
1280         req_capsule_set_size(pill, &RMF_GENERIC_DATA,
1281                              RCL_SERVER, replylen);
1282
1283         rc = req_capsule_server_pack(pill);
1284         if (rc)
1285                 RETURN(rc);
1286
1287         reply = req_capsule_server_get(pill, &RMF_GENERIC_DATA);
1288         if (reply == NULL)
1289                 RETURN(-ENOMEM);
1290
1291         /* call again to fill in the reply buffer */
1292         rc = obd_get_info(req->rq_svc_thread->t_env, exp, keylen, key,
1293                           &replylen, reply, NULL);
1294
1295         lustre_msg_set_status(req->rq_repmsg, 0);
1296         RETURN(rc);
1297 }
1298
1299 #ifdef HAVE_QUOTA_SUPPORT
1300 static int ost_handle_quotactl(struct ptlrpc_request *req)
1301 {
1302         struct obd_quotactl *oqctl, *repoqc;
1303         int rc;
1304         ENTRY;
1305
1306         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1307         if (oqctl == NULL)
1308                 GOTO(out, rc = -EPROTO);
1309
1310         rc = req_capsule_server_pack(&req->rq_pill);
1311         if (rc)
1312                 GOTO(out, rc);
1313
1314         repoqc = req_capsule_server_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1315         req->rq_status = obd_quotactl(req->rq_export, oqctl);
1316         *repoqc = *oqctl;
1317
1318 out:
1319         RETURN(rc);
1320 }
1321
1322 static int ost_handle_quotacheck(struct ptlrpc_request *req)
1323 {
1324         struct obd_quotactl *oqctl;
1325         int rc;
1326         ENTRY;
1327
1328         oqctl = req_capsule_client_get(&req->rq_pill, &RMF_OBD_QUOTACTL);
1329         if (oqctl == NULL)
1330                 RETURN(-EPROTO);
1331
1332         rc = req_capsule_server_pack(&req->rq_pill);
1333         if (rc)
1334                 RETURN(-ENOMEM);
1335
1336         req->rq_status = obd_quotacheck(req->rq_export, oqctl);
1337         RETURN(0);
1338 }
1339
1340 static int ost_handle_quota_adjust_qunit(struct ptlrpc_request *req)
1341 {
1342         struct quota_adjust_qunit *oqaq, *repoqa;
1343         struct lustre_quota_ctxt *qctxt;
1344         int rc;
1345         ENTRY;
1346
1347         qctxt = &req->rq_export->exp_obd->u.obt.obt_qctxt;
1348         oqaq = req_capsule_client_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
1349         if (oqaq == NULL)
1350                 GOTO(out, rc = -EPROTO);
1351
1352         rc = req_capsule_server_pack(&req->rq_pill);
1353         if (rc)
1354                 GOTO(out, rc);
1355
1356         repoqa = req_capsule_server_get(&req->rq_pill, &RMF_QUOTA_ADJUST_QUNIT);
1357         req->rq_status = obd_quota_adjust_qunit(req->rq_export, oqaq, qctxt, NULL);
1358         *repoqa = *oqaq;
1359
1360  out:
1361         RETURN(rc);
1362 }
1363 #endif
1364
1365 static int ost_llog_handle_connect(struct obd_export *exp,
1366                                    struct ptlrpc_request *req)
1367 {
1368         struct llogd_conn_body *body;
1369         int rc;
1370         ENTRY;
1371
1372         body = req_capsule_client_get(&req->rq_pill, &RMF_LLOGD_CONN_BODY);
1373         rc = obd_llog_connect(exp, body);
1374         RETURN(rc);
1375 }
1376
1377 #define ost_init_sec_none(reply, exp)                                   \
1378 do {                                                                    \
1379         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |          \
1380                                       OBD_CONNECT_RMT_CLIENT_FORCE |    \
1381                                       OBD_CONNECT_OSS_CAPA);            \
1382         cfs_spin_lock(&exp->exp_lock);                                  \
1383         exp->exp_connect_flags = reply->ocd_connect_flags;              \
1384         cfs_spin_unlock(&exp->exp_lock);                                \
1385 } while (0)
1386
1387 static int ost_init_sec_level(struct ptlrpc_request *req)
1388 {
1389         struct obd_export *exp = req->rq_export;
1390         struct req_capsule *pill = &req->rq_pill;
1391         struct obd_device *obd = exp->exp_obd;
1392         struct filter_obd *filter = &obd->u.filter;
1393         char *client = libcfs_nid2str(req->rq_peer.nid);
1394         struct obd_connect_data *data, *reply;
1395         int rc = 0, remote;
1396         ENTRY;
1397
1398         data = req_capsule_client_get(pill, &RMF_CONNECT_DATA);
1399         reply = req_capsule_server_get(pill, &RMF_CONNECT_DATA);
1400         if (data == NULL || reply == NULL)
1401                 RETURN(-EFAULT);
1402
1403         /* connection from MDT is always trusted */
1404         if (req->rq_auth_usr_mdt) {
1405                 ost_init_sec_none(reply, exp);
1406                 RETURN(0);
1407         }
1408
1409         /* no GSS support case */
1410         if (!req->rq_auth_gss) {
1411                 if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
1412                         CWARN("client %s -> target %s does not user GSS, "
1413                               "can not run under security level %d.\n",
1414                               client, obd->obd_name, filter->fo_sec_level);
1415                         RETURN(-EACCES);
1416                 } else {
1417                         ost_init_sec_none(reply, exp);
1418                         RETURN(0);
1419                 }
1420         }
1421
1422         /* old version case */
1423         if (unlikely(!(data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT) ||
1424                      !(data->ocd_connect_flags & OBD_CONNECT_OSS_CAPA))) {
1425                 if (filter->fo_sec_level > LUSTRE_SEC_NONE) {
1426                         CWARN("client %s -> target %s uses old version, "
1427                               "can not run under security level %d.\n",
1428                               client, obd->obd_name, filter->fo_sec_level);
1429                         RETURN(-EACCES);
1430                 } else {
1431                         CWARN("client %s -> target %s uses old version, "
1432                               "run under security level %d.\n",
1433                               client, obd->obd_name, filter->fo_sec_level);
1434                         ost_init_sec_none(reply, exp);
1435                         RETURN(0);
1436                 }
1437         }
1438
1439         remote = data->ocd_connect_flags & OBD_CONNECT_RMT_CLIENT_FORCE;
1440         if (remote) {
1441                 if (!req->rq_auth_remote)
1442                         CDEBUG(D_SEC, "client (local realm) %s -> target %s "
1443                                "asked to be remote.\n", client, obd->obd_name);
1444         } else if (req->rq_auth_remote) {
1445                 remote = 1;
1446                 CDEBUG(D_SEC, "client (remote realm) %s -> target %s is set "
1447                        "as remote by default.\n", client, obd->obd_name);
1448         }
1449
1450         if (remote) {
1451                 if (!filter->fo_fl_oss_capa) {
1452                         CDEBUG(D_SEC, "client %s -> target %s is set as remote,"
1453                                " but OSS capabilities are not enabled: %d.\n",
1454                                client, obd->obd_name, filter->fo_fl_oss_capa);
1455                         RETURN(-EACCES);
1456                 }
1457         }
1458
1459         switch (filter->fo_sec_level) {
1460         case LUSTRE_SEC_NONE:
1461                 if (!remote) {
1462                         ost_init_sec_none(reply, exp);
1463                         break;
1464                 } else {
1465                         CDEBUG(D_SEC, "client %s -> target %s is set as remote, "
1466                                "can not run under security level %d.\n",
1467                                client, obd->obd_name, filter->fo_sec_level);
1468                         RETURN(-EACCES);
1469                 }
1470         case LUSTRE_SEC_REMOTE:
1471                 if (!remote)
1472                         ost_init_sec_none(reply, exp);
1473                 break;
1474         case LUSTRE_SEC_ALL:
1475                 if (!remote) {
1476                         reply->ocd_connect_flags &= ~(OBD_CONNECT_RMT_CLIENT |
1477                                                       OBD_CONNECT_RMT_CLIENT_FORCE);
1478                         if (!filter->fo_fl_oss_capa)
1479                                 reply->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
1480
1481                         cfs_spin_lock(&exp->exp_lock);
1482                         exp->exp_connect_flags = reply->ocd_connect_flags;
1483                         cfs_spin_unlock(&exp->exp_lock);
1484                 }
1485                 break;
1486         default:
1487                 RETURN(-EINVAL);
1488         }
1489
1490         RETURN(rc);
1491 }
1492
1493 /*
1494  * FIXME
1495  * this should be done in filter_connect()/filter_reconnect(), but
1496  * we can't obtain information like NID, which stored in incoming
1497  * request, thus can't decide what flavor to use. so we do it here.
1498  *
1499  * This hack should be removed after the OST stack be rewritten, just
1500  * like what we are doing in mdt_obd_connect()/mdt_obd_reconnect().
1501  */
1502 static int ost_connect_check_sptlrpc(struct ptlrpc_request *req)
1503 {
1504         struct obd_export     *exp = req->rq_export;
1505         struct filter_obd     *filter = &exp->exp_obd->u.filter;
1506         struct sptlrpc_flavor  flvr;
1507         int                    rc = 0;
1508
1509         if (unlikely(strcmp(exp->exp_obd->obd_type->typ_name,
1510                             LUSTRE_ECHO_NAME) == 0)) {
1511                 exp->exp_flvr.sf_rpc = SPTLRPC_FLVR_ANY;
1512                 return 0;
1513         }
1514
1515         if (exp->exp_flvr.sf_rpc == SPTLRPC_FLVR_INVALID) {
1516                 cfs_read_lock(&filter->fo_sptlrpc_lock);
1517                 sptlrpc_target_choose_flavor(&filter->fo_sptlrpc_rset,
1518                                              req->rq_sp_from,
1519                                              req->rq_peer.nid,
1520                                              &flvr);
1521                 cfs_read_unlock(&filter->fo_sptlrpc_lock);
1522
1523                 cfs_spin_lock(&exp->exp_lock);
1524
1525                 exp->exp_sp_peer = req->rq_sp_from;
1526                 exp->exp_flvr = flvr;
1527
1528                 if (exp->exp_flvr.sf_rpc != SPTLRPC_FLVR_ANY &&
1529                     exp->exp_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
1530                         CERROR("unauthorized rpc flavor %x from %s, "
1531                                "expect %x\n", req->rq_flvr.sf_rpc,
1532                                libcfs_nid2str(req->rq_peer.nid),
1533                                exp->exp_flvr.sf_rpc);
1534                         rc = -EACCES;
1535                 }
1536
1537                 cfs_spin_unlock(&exp->exp_lock);
1538         } else {
1539                 if (exp->exp_sp_peer != req->rq_sp_from) {
1540                         CERROR("RPC source %s doesn't match %s\n",
1541                                sptlrpc_part2name(req->rq_sp_from),
1542                                sptlrpc_part2name(exp->exp_sp_peer));
1543                         rc = -EACCES;
1544                 } else {
1545                         rc = sptlrpc_target_export_check(exp, req);
1546                 }
1547         }
1548
1549         return rc;
1550 }
1551
1552 /* Ensure that data and metadata are synced to the disk when lock is cancelled
1553  * (if requested) */
1554 int ost_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
1555                      void *data, int flag)
1556 {
1557         struct lu_env   env;
1558         __u32           sync_lock_cancel = 0;
1559         __u32           len = sizeof(sync_lock_cancel);
1560         int             rc = 0;
1561
1562         ENTRY;
1563
1564         rc = lu_env_init(&env, LCT_DT_THREAD);
1565         if (unlikely(rc != 0))
1566                 RETURN(rc);
1567
1568         rc = obd_get_info(&env, lock->l_export, sizeof(KEY_SYNC_LOCK_CANCEL),
1569                           KEY_SYNC_LOCK_CANCEL, &len, &sync_lock_cancel, NULL);
1570         if (rc == 0 && flag == LDLM_CB_CANCELING &&
1571             (lock->l_granted_mode & (LCK_PW|LCK_GROUP)) &&
1572             (sync_lock_cancel == ALWAYS_SYNC_ON_CANCEL ||
1573              (sync_lock_cancel == BLOCKING_SYNC_ON_CANCEL &&
1574               lock->l_flags & LDLM_FL_CBPENDING))) {
1575                 struct obd_info *oinfo;
1576                 struct obdo     *oa;
1577                 int              rc;
1578
1579                 OBD_ALLOC_PTR(oinfo);
1580                 if (!oinfo)
1581                         GOTO(out_env, rc = -ENOMEM);
1582                 OBDO_ALLOC(oa);
1583                 if (!oa) {
1584                         OBD_FREE_PTR(oinfo);
1585                         GOTO(out_env, rc = -ENOMEM);
1586                 }
1587                 oa->o_id = lock->l_resource->lr_name.name[0];
1588                 oa->o_seq = lock->l_resource->lr_name.name[1];
1589                 oa->o_valid = OBD_MD_FLID|OBD_MD_FLGROUP;
1590                 oinfo->oi_oa = oa;
1591
1592                 rc = obd_sync(&env, lock->l_export, oinfo,
1593                               lock->l_policy_data.l_extent.start,
1594                               lock->l_policy_data.l_extent.end, NULL);
1595                 if (rc)
1596                         CERROR("Error %d syncing data on lock cancel\n", rc);
1597
1598                 OBDO_FREE(oa);
1599                 OBD_FREE_PTR(oinfo);
1600         }
1601
1602         rc = ldlm_server_blocking_ast(lock, desc, data, flag);
1603 out_env:
1604         lu_env_fini(&env);
1605         RETURN(rc);
1606 }
1607
1608 static int ost_filter_recovery_request(struct ptlrpc_request *req,
1609                                        struct obd_device *obd, int *process)
1610 {
1611         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1612         case OST_CONNECT: /* This will never get here, but for completeness. */
1613         case OST_DISCONNECT:
1614                *process = 1;
1615                RETURN(0);
1616
1617         case OBD_PING:
1618         case OST_CREATE:
1619         case OST_DESTROY:
1620         case OST_PUNCH:
1621         case OST_SETATTR:
1622         case OST_SYNC:
1623         case OST_WRITE:
1624         case OBD_LOG_CANCEL:
1625         case LDLM_ENQUEUE:
1626                 *process = target_queue_recovery_request(req, obd);
1627                 RETURN(0);
1628
1629         default:
1630                 DEBUG_REQ(D_WARNING, req, "not permitted during recovery");
1631                 *process = -EAGAIN;
1632                 RETURN(0);
1633         }
1634 }
1635
1636 int ost_msg_check_version(struct lustre_msg *msg)
1637 {
1638         int rc;
1639
1640         switch(lustre_msg_get_opc(msg)) {
1641         case OST_CONNECT:
1642         case OST_DISCONNECT:
1643         case OBD_PING:
1644         case SEC_CTX_INIT:
1645         case SEC_CTX_INIT_CONT:
1646         case SEC_CTX_FINI:
1647                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
1648                 if (rc)
1649                         CERROR("bad opc %u version %08x, expecting %08x\n",
1650                                lustre_msg_get_opc(msg),
1651                                lustre_msg_get_version(msg),
1652                                LUSTRE_OBD_VERSION);
1653                 break;
1654         case OST_CREATE:
1655         case OST_DESTROY:
1656         case OST_GETATTR:
1657         case OST_SETATTR:
1658         case OST_WRITE:
1659         case OST_READ:
1660         case OST_PUNCH:
1661         case OST_STATFS:
1662         case OST_SYNC:
1663         case OST_SET_INFO:
1664         case OST_GET_INFO:
1665 #ifdef HAVE_QUOTA_SUPPORT
1666         case OST_QUOTACHECK:
1667         case OST_QUOTACTL:
1668         case OST_QUOTA_ADJUST_QUNIT:
1669 #endif
1670                 rc = lustre_msg_check_version(msg, LUSTRE_OST_VERSION);
1671                 if (rc)
1672                         CERROR("bad opc %u version %08x, expecting %08x\n",
1673                                lustre_msg_get_opc(msg),
1674                                lustre_msg_get_version(msg),
1675                                LUSTRE_OST_VERSION);
1676                 break;
1677         case LDLM_ENQUEUE:
1678         case LDLM_CONVERT:
1679         case LDLM_CANCEL:
1680         case LDLM_BL_CALLBACK:
1681         case LDLM_CP_CALLBACK:
1682                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
1683                 if (rc)
1684                         CERROR("bad opc %u version %08x, expecting %08x\n",
1685                                lustre_msg_get_opc(msg),
1686                                lustre_msg_get_version(msg),
1687                                LUSTRE_DLM_VERSION);
1688                 break;
1689         case LLOG_ORIGIN_CONNECT:
1690         case OBD_LOG_CANCEL:
1691                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
1692                 if (rc)
1693                         CERROR("bad opc %u version %08x, expecting %08x\n",
1694                                lustre_msg_get_opc(msg),
1695                                lustre_msg_get_version(msg),
1696                                LUSTRE_LOG_VERSION);
1697                 break;
1698         default:
1699                 CERROR("Unexpected opcode %d\n", lustre_msg_get_opc(msg));
1700                 rc = -ENOTSUPP;
1701         }
1702         return rc;
1703 }
1704
1705 struct ost_prolong_data {
1706         struct ptlrpc_request *opd_req;
1707         struct obd_export     *opd_exp;
1708         struct obdo           *opd_oa;
1709         struct ldlm_res_id     opd_resid;
1710         struct ldlm_extent     opd_extent;
1711         ldlm_mode_t            opd_mode;
1712         unsigned int           opd_locks;
1713         int                    opd_timeout;
1714 };
1715
1716 /* prolong locks for the current service time of the corresponding
1717  * portal (= OST_IO_PORTAL)
1718  */
1719 static inline int prolong_timeout(struct ptlrpc_request *req)
1720 {
1721         struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
1722
1723         if (AT_OFF)
1724                 return obd_timeout / 2;
1725
1726         return max(at_est2timeout(at_get(&svcpt->scp_at_estimate)),
1727                    ldlm_timeout);
1728 }
1729
1730 static void ost_prolong_lock_one(struct ost_prolong_data *opd,
1731                                  struct ldlm_lock *lock)
1732 {
1733         LASSERT(lock->l_export == opd->opd_exp);
1734
1735         if (lock->l_destroyed) /* lock already cancelled */
1736                 return;
1737
1738         /* XXX: never try to grab resource lock here because we're inside
1739          * exp_bl_list_lock; in ldlm_lockd.c to handle waiting list we take
1740          * res lock and then exp_bl_list_lock. */
1741
1742         if (!(lock->l_flags & LDLM_FL_AST_SENT))
1743                 /* ignore locks not being cancelled */
1744                 return;
1745
1746         LDLM_DEBUG(lock,
1747                    "refreshed for req x"LPU64" ext("LPU64"->"LPU64") to %ds.\n",
1748                    opd->opd_req->rq_xid, opd->opd_extent.start,
1749                    opd->opd_extent.end, opd->opd_timeout);
1750
1751         /* OK. this is a possible lock the user holds doing I/O
1752          * let's refresh eviction timer for it */
1753         ldlm_refresh_waiting_lock(lock, opd->opd_timeout);
1754         ++opd->opd_locks;
1755 }
1756
1757 static void ost_prolong_locks(struct ost_prolong_data *data)
1758 {
1759         struct obd_export *exp = data->opd_exp;
1760         struct obdo       *oa  = data->opd_oa;
1761         struct ldlm_lock  *lock;
1762         ENTRY;
1763
1764         if (oa->o_valid & OBD_MD_FLHANDLE) {
1765                 /* mostly a request should be covered by only one lock, try
1766                  * fast path. */
1767                 lock = ldlm_handle2lock(&oa->o_handle);
1768                 if (lock != NULL) {
1769                         /* Fast path to check if the lock covers the whole IO
1770                          * region exclusively. */
1771                         if (lock->l_granted_mode == LCK_PW &&
1772                             ldlm_extent_contain(&lock->l_policy_data.l_extent,
1773                                                 &data->opd_extent)) {
1774                                 /* bingo */
1775                                 ost_prolong_lock_one(data, lock);
1776                                 LDLM_LOCK_PUT(lock);
1777                                 RETURN_EXIT;
1778                         }
1779                         LDLM_LOCK_PUT(lock);
1780                 }
1781         }
1782
1783
1784         cfs_spin_lock_bh(&exp->exp_bl_list_lock);
1785         cfs_list_for_each_entry(lock, &exp->exp_bl_list, l_exp_list) {
1786                 LASSERT(lock->l_flags & LDLM_FL_AST_SENT);
1787                 LASSERT(lock->l_resource->lr_type == LDLM_EXTENT);
1788
1789                 if (!ldlm_res_eq(&data->opd_resid, &lock->l_resource->lr_name))
1790                         continue;
1791
1792                 if (!ldlm_extent_overlap(&lock->l_policy_data.l_extent,
1793                                          &data->opd_extent))
1794                         continue;
1795
1796                 ost_prolong_lock_one(data, lock);
1797         }
1798         cfs_spin_unlock_bh(&exp->exp_bl_list_lock);
1799
1800         EXIT;
1801 }
1802
1803 /**
1804  * Returns 1 if the given PTLRPC matches the given LDLM locks, or 0 if it does
1805  * not.
1806  */
1807 static int ost_rw_hpreq_lock_match(struct ptlrpc_request *req,
1808                                    struct ldlm_lock *lock)
1809 {
1810         struct niobuf_remote *nb;
1811         struct obd_ioobj *ioo;
1812         int mode, opc;
1813         struct ldlm_extent ext;
1814         ENTRY;
1815
1816         opc = lustre_msg_get_opc(req->rq_reqmsg);
1817         LASSERT(opc == OST_READ || opc == OST_WRITE);
1818
1819         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1820         LASSERT(ioo != NULL);
1821
1822         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1823         LASSERT(nb != NULL);
1824
1825         ext.start = nb->offset;
1826         nb += ioo->ioo_bufcnt - 1;
1827         ext.end = nb->offset + nb->len - 1;
1828
1829         LASSERT(lock->l_resource != NULL);
1830         if (!osc_res_name_eq(ioo->ioo_id, ioo->ioo_seq,
1831                              &lock->l_resource->lr_name))
1832                 RETURN(0);
1833
1834         mode = LCK_PW;
1835         if (opc == OST_READ)
1836                 mode |= LCK_PR;
1837         if (!(lock->l_granted_mode & mode))
1838                 RETURN(0);
1839
1840         RETURN(ldlm_extent_overlap(&lock->l_policy_data.l_extent, &ext));
1841 }
1842
1843 /**
1844  * High-priority queue request check for whether the given PTLRPC request (\a
1845  * req) is blocking an LDLM lock cancel.
1846  *
1847  * Returns 1 if the given given PTLRPC request (\a req) is blocking an LDLM lock
1848  * cancel, 0 if it is not, and -EFAULT if the request is malformed.
1849  *
1850  * Only OST_READs, OST_WRITEs and OST_PUNCHes go on the h-p RPC queue.  This
1851  * function looks only at OST_READs and OST_WRITEs.
1852  */
1853 static int ost_rw_hpreq_check(struct ptlrpc_request *req)
1854 {
1855         struct obd_device *obd = req->rq_export->exp_obd;
1856         struct ost_body *body;
1857         struct obd_ioobj *ioo;
1858         struct niobuf_remote *nb;
1859         struct ost_prolong_data opd = { 0 };
1860         int mode, opc;
1861         ENTRY;
1862
1863         /*
1864          * Use LASSERT to do sanity check because malformed RPCs should have
1865          * been filtered out in ost_hpreq_handler().
1866          */
1867         opc = lustre_msg_get_opc(req->rq_reqmsg);
1868         LASSERT(opc == OST_READ || opc == OST_WRITE);
1869
1870         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1871         LASSERT(body != NULL);
1872
1873         ioo = req_capsule_client_get(&req->rq_pill, &RMF_OBD_IOOBJ);
1874         LASSERT(ioo != NULL);
1875
1876         nb = req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE);
1877         LASSERT(nb != NULL);
1878         LASSERT(!(nb->flags & OBD_BRW_SRVLOCK));
1879
1880         osc_build_res_name(ioo->ioo_id, ioo->ioo_seq, &opd.opd_resid);
1881
1882         opd.opd_req = req;
1883         mode = LCK_PW;
1884         if (opc == OST_READ)
1885                 mode |= LCK_PR;
1886         opd.opd_mode = mode;
1887         opd.opd_exp = req->rq_export;
1888         opd.opd_oa  = &body->oa;
1889         opd.opd_extent.start = nb->offset;
1890         nb += ioo->ioo_bufcnt - 1;
1891         opd.opd_extent.end = nb->offset + nb->len - 1;
1892         opd.opd_timeout = prolong_timeout(req);
1893
1894         DEBUG_REQ(D_RPCTRACE, req,
1895                "%s %s: refresh rw locks: " LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
1896                obd->obd_name, cfs_current()->comm,
1897                opd.opd_resid.name[0], opd.opd_resid.name[1],
1898                opd.opd_extent.start, opd.opd_extent.end);
1899
1900         ost_prolong_locks(&opd);
1901
1902         CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
1903                obd->obd_name, opd.opd_locks, req);
1904
1905         RETURN(opd.opd_locks);
1906 }
1907
1908 static void ost_rw_hpreq_fini(struct ptlrpc_request *req)
1909 {
1910         (void)ost_rw_hpreq_check(req);
1911 }
1912
1913 /**
1914  * Like ost_rw_hpreq_lock_match(), but for OST_PUNCH RPCs.
1915  */
1916 static int ost_punch_hpreq_lock_match(struct ptlrpc_request *req,
1917                                       struct ldlm_lock *lock)
1918 {
1919         struct ost_body *body;
1920         ENTRY;
1921
1922         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1923         LASSERT(body != NULL);
1924
1925         if (body->oa.o_valid & OBD_MD_FLHANDLE &&
1926             body->oa.o_handle.cookie == lock->l_handle.h_cookie)
1927                 RETURN(1);
1928
1929         RETURN(0);
1930 }
1931
1932 /**
1933  * Like ost_rw_hpreq_check(), but for OST_PUNCH RPCs.
1934  */
1935 static int ost_punch_hpreq_check(struct ptlrpc_request *req)
1936 {
1937         struct obd_device *obd = req->rq_export->exp_obd;
1938         struct ost_body *body;
1939         struct obdo *oa;
1940         struct ost_prolong_data opd = { 0 };
1941         __u64 start, end;
1942         ENTRY;
1943
1944         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1945         LASSERT(body != NULL);
1946
1947         oa = &body->oa;
1948         LASSERT(!(oa->o_valid & OBD_MD_FLFLAGS) ||
1949                 !(oa->o_flags & OBD_FL_SRVLOCK));
1950
1951         start = oa->o_size;
1952         end = start + oa->o_blocks;
1953
1954         opd.opd_req = req;
1955         opd.opd_mode = LCK_PW;
1956         opd.opd_exp = req->rq_export;
1957         opd.opd_oa  = oa;
1958         opd.opd_extent.start = start;
1959         opd.opd_extent.end   = end;
1960         if (oa->o_blocks == OBD_OBJECT_EOF)
1961                 opd.opd_extent.end = OBD_OBJECT_EOF;
1962         opd.opd_timeout = prolong_timeout(req);
1963
1964         osc_build_res_name(oa->o_id, oa->o_seq, &opd.opd_resid);
1965
1966         CDEBUG(D_DLMTRACE,
1967                "%s: refresh locks: "LPU64"/"LPU64" ("LPU64"->"LPU64")\n",
1968                obd->obd_name,
1969                opd.opd_resid.name[0], opd.opd_resid.name[1],
1970                opd.opd_extent.start, opd.opd_extent.end);
1971
1972         ost_prolong_locks(&opd);
1973
1974         CDEBUG(D_DLMTRACE, "%s: refreshed %u locks timeout for req %p.\n",
1975                obd->obd_name, opd.opd_locks, req);
1976
1977         RETURN(opd.opd_locks > 0);
1978 }
1979
1980 static void ost_punch_hpreq_fini(struct ptlrpc_request *req)
1981 {
1982         (void)ost_punch_hpreq_check(req);
1983 }
1984
1985 struct ptlrpc_hpreq_ops ost_hpreq_rw = {
1986         .hpreq_lock_match = ost_rw_hpreq_lock_match,
1987         .hpreq_check      = ost_rw_hpreq_check,
1988         .hpreq_fini       = ost_rw_hpreq_fini
1989 };
1990
1991 struct ptlrpc_hpreq_ops ost_hpreq_punch = {
1992         .hpreq_lock_match = ost_punch_hpreq_lock_match,
1993         .hpreq_check      = ost_punch_hpreq_check,
1994         .hpreq_fini       = ost_punch_hpreq_fini
1995 };
1996
1997 /** Assign high priority operations to the request if needed. */
1998 static int ost_hpreq_handler(struct ptlrpc_request *req)
1999 {
2000         ENTRY;
2001         if (req->rq_export) {
2002                 int opc = lustre_msg_get_opc(req->rq_reqmsg);
2003                 struct ost_body *body;
2004
2005                 if (opc == OST_READ || opc == OST_WRITE) {
2006                         struct niobuf_remote *nb;
2007                         struct obd_ioobj *ioo;
2008                         int objcount, niocount;
2009                         int rc;
2010                         int i;
2011
2012                         /* RPCs on the H-P queue can be inspected before
2013                          * ost_handler() initializes their pills, so we
2014                          * initialize that here.  Capsule initialization is
2015                          * idempotent, as is setting the pill's format (provided
2016                          * it doesn't change).
2017                          */
2018                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2019                         if (opc == OST_READ)
2020                                 req_capsule_set(&req->rq_pill,
2021                                                 &RQF_OST_BRW_READ);
2022                         else
2023                                 req_capsule_set(&req->rq_pill,
2024                                                 &RQF_OST_BRW_WRITE);
2025
2026                         body = req_capsule_client_get(&req->rq_pill,
2027                                                       &RMF_OST_BODY);
2028                         if (body == NULL) {
2029                                 CERROR("Missing/short ost_body\n");
2030                                 RETURN(-EFAULT);
2031                         }
2032
2033                         objcount = req_capsule_get_size(&req->rq_pill,
2034                                                         &RMF_OBD_IOOBJ,
2035                                                         RCL_CLIENT) /
2036                                                         sizeof(*ioo);
2037                         if (objcount == 0) {
2038                                 CERROR("Missing/short ioobj\n");
2039                                 RETURN(-EFAULT);
2040                         }
2041                         if (objcount > 1) {
2042                                 CERROR("too many ioobjs (%d)\n", objcount);
2043                                 RETURN(-EFAULT);
2044                         }
2045
2046                         ioo = req_capsule_client_get(&req->rq_pill,
2047                                                      &RMF_OBD_IOOBJ);
2048                         if (ioo == NULL) {
2049                                 CERROR("Missing/short ioobj\n");
2050                                 RETURN(-EFAULT);
2051                         }
2052
2053                         rc = ost_validate_obdo(req->rq_export, &body->oa, ioo);
2054                         if (rc) {
2055                                 CERROR("invalid object ids\n");
2056                                 RETURN(rc);
2057                         }
2058
2059                         for (niocount = i = 0; i < objcount; i++) {
2060                                 if (ioo[i].ioo_bufcnt == 0) {
2061                                         CERROR("ioo[%d] has zero bufcnt\n", i);
2062                                         RETURN(-EFAULT);
2063                                 }
2064                                 niocount += ioo[i].ioo_bufcnt;
2065                         }
2066                         if (niocount > PTLRPC_MAX_BRW_PAGES) {
2067                                 DEBUG_REQ(D_RPCTRACE, req,
2068                                           "bulk has too many pages (%d)",
2069                                           niocount);
2070                                 RETURN(-EFAULT);
2071                         }
2072
2073                         nb = req_capsule_client_get(&req->rq_pill,
2074                                                     &RMF_NIOBUF_REMOTE);
2075                         if (nb == NULL) {
2076                                 CERROR("Missing/short niobuf\n");
2077                                 RETURN(-EFAULT);
2078                         }
2079
2080                         if (niocount == 0 || !(nb[0].flags & OBD_BRW_SRVLOCK))
2081                                 req->rq_ops = &ost_hpreq_rw;
2082                 } else if (opc == OST_PUNCH) {
2083                         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2084                         req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
2085
2086                         body = req_capsule_client_get(&req->rq_pill,
2087                                                       &RMF_OST_BODY);
2088                         if (body == NULL) {
2089                                 CERROR("Missing/short ost_body\n");
2090                                 RETURN(-EFAULT);
2091                         }
2092
2093                         if (!(body->oa.o_valid & OBD_MD_FLFLAGS) ||
2094                             !(body->oa.o_flags & OBD_FL_SRVLOCK))
2095                                 req->rq_ops = &ost_hpreq_punch;
2096                 }
2097         }
2098         RETURN(0);
2099 }
2100
2101 /* TODO: handle requests in a similar way as MDT: see mdt_handle_common() */
2102 int ost_handle(struct ptlrpc_request *req)
2103 {
2104         struct obd_trans_info trans_info = { 0, };
2105         struct obd_trans_info *oti = &trans_info;
2106         int should_process, fail = OBD_FAIL_OST_ALL_REPLY_NET, rc = 0;
2107         struct obd_device *obd = NULL;
2108         ENTRY;
2109
2110         /* OST module is kept between remounts, but the last reference
2111          * to specific module (say, osd or ofd) kills all related keys
2112          * from the environment. so we have to refill it until the root
2113          * cause is fixed properly */
2114         lu_env_refill(req->rq_svc_thread->t_env);
2115
2116         LASSERT(current->journal_info == NULL);
2117
2118         /* primordial rpcs don't affect server recovery */
2119         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2120         case SEC_CTX_INIT:
2121         case SEC_CTX_INIT_CONT:
2122         case SEC_CTX_FINI:
2123                 GOTO(out, rc = 0);
2124         }
2125
2126         req_capsule_init(&req->rq_pill, req, RCL_SERVER);
2127
2128         if (lustre_msg_get_opc(req->rq_reqmsg) != OST_CONNECT) {
2129                 if (!class_connected_export(req->rq_export)) {
2130                         CDEBUG(D_HA,"operation %d on unconnected OST from %s\n",
2131                                lustre_msg_get_opc(req->rq_reqmsg),
2132                                libcfs_id2str(req->rq_peer));
2133                         req->rq_status = -ENOTCONN;
2134                         GOTO(out, rc = -ENOTCONN);
2135                 }
2136
2137                 obd = req->rq_export->exp_obd;
2138
2139                 /* Check for aborted recovery. */
2140                 if (obd->obd_recovering) {
2141                         rc = ost_filter_recovery_request(req, obd,
2142                                                          &should_process);
2143                         if (rc || !should_process)
2144                                 RETURN(rc);
2145                         else if (should_process < 0) {
2146                                 req->rq_status = should_process;
2147                                 rc = ptlrpc_error(req);
2148                                 RETURN(rc);
2149                         }
2150                 }
2151         }
2152
2153         oti_init(oti, req);
2154
2155         rc = ost_msg_check_version(req->rq_reqmsg);
2156         if (rc)
2157                 RETURN(rc);
2158
2159         if (req && req->rq_reqmsg && req->rq_export &&
2160             (req->rq_export->exp_connect_flags & OBD_CONNECT_JOBSTATS))
2161                 oti->oti_jobid = lustre_msg_get_jobid(req->rq_reqmsg);
2162
2163         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2164         case OST_CONNECT: {
2165                 CDEBUG(D_INODE, "connect\n");
2166                 req_capsule_set(&req->rq_pill, &RQF_OST_CONNECT);
2167                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET))
2168                         RETURN(0);
2169                 rc = target_handle_connect(req);
2170                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CONNECT_NET2))
2171                         RETURN(0);
2172                 if (!rc) {
2173                         rc = ost_init_sec_level(req);
2174                         if (!rc)
2175                                 rc = ost_connect_check_sptlrpc(req);
2176                 }
2177                 break;
2178         }
2179         case OST_DISCONNECT:
2180                 CDEBUG(D_INODE, "disconnect\n");
2181                 req_capsule_set(&req->rq_pill, &RQF_OST_DISCONNECT);
2182                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DISCONNECT_NET))
2183                         RETURN(0);
2184                 rc = target_handle_disconnect(req);
2185                 break;
2186         case OST_CREATE:
2187                 CDEBUG(D_INODE, "create\n");
2188                 req_capsule_set(&req->rq_pill, &RQF_OST_CREATE);
2189                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_CREATE_NET))
2190                         RETURN(0);
2191                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2192                         GOTO(out, rc = -EROFS);
2193                 rc = ost_create(req->rq_export, req, oti);
2194                 break;
2195         case OST_DESTROY:
2196                 CDEBUG(D_INODE, "destroy\n");
2197                 req_capsule_set(&req->rq_pill, &RQF_OST_DESTROY);
2198                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_DESTROY_NET))
2199                         RETURN(0);
2200                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2201                         GOTO(out, rc = -EROFS);
2202                 rc = ost_destroy(req->rq_export, req, oti);
2203                 break;
2204         case OST_GETATTR:
2205                 CDEBUG(D_INODE, "getattr\n");
2206                 req_capsule_set(&req->rq_pill, &RQF_OST_GETATTR);
2207                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_GETATTR_NET))
2208                         RETURN(0);
2209                 rc = ost_getattr(req->rq_export, req);
2210                 break;
2211         case OST_SETATTR:
2212                 CDEBUG(D_INODE, "setattr\n");
2213                 req_capsule_set(&req->rq_pill, &RQF_OST_SETATTR);
2214                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SETATTR_NET))
2215                         RETURN(0);
2216                 rc = ost_setattr(req->rq_export, req, oti);
2217                 break;
2218         case OST_WRITE:
2219                 req_capsule_set(&req->rq_pill, &RQF_OST_BRW_WRITE);
2220                 CDEBUG(D_INODE, "write\n");
2221                 /* req->rq_request_portal would be nice, if it was set */
2222                 if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
2223                         CERROR("%s: deny write request from %s to portal %u\n",
2224                                req->rq_export->exp_obd->obd_name,
2225                                obd_export_nid2str(req->rq_export),
2226                                ptlrpc_req2svc(req)->srv_req_portal);
2227                         GOTO(out, rc = -EPROTO);
2228                 }
2229                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
2230                         RETURN(0);
2231                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOSPC))
2232                         GOTO(out, rc = -ENOSPC);
2233                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2234                         GOTO(out, rc = -EROFS);
2235                 rc = ost_brw_write(req, oti);
2236                 LASSERT(current->journal_info == NULL);
2237                 /* ost_brw_write sends its own replies */
2238                 RETURN(rc);
2239         case OST_READ:
2240                 req_capsule_set(&req->rq_pill, &RQF_OST_BRW_READ);
2241                 CDEBUG(D_INODE, "read\n");
2242                 /* req->rq_request_portal would be nice, if it was set */
2243                 if (ptlrpc_req2svc(req)->srv_req_portal != OST_IO_PORTAL) {
2244                         CERROR("%s: deny read request from %s to portal %u\n",
2245                                req->rq_export->exp_obd->obd_name,
2246                                obd_export_nid2str(req->rq_export),
2247                                ptlrpc_req2svc(req)->srv_req_portal);
2248                         GOTO(out, rc = -EPROTO);
2249                 }
2250                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_BRW_NET))
2251                         RETURN(0);
2252                 rc = ost_brw_read(req, oti);
2253                 LASSERT(current->journal_info == NULL);
2254                 /* ost_brw_read sends its own replies */
2255                 RETURN(rc);
2256         case OST_PUNCH:
2257                 CDEBUG(D_INODE, "punch\n");
2258                 req_capsule_set(&req->rq_pill, &RQF_OST_PUNCH);
2259                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_PUNCH_NET))
2260                         RETURN(0);
2261                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_EROFS))
2262                         GOTO(out, rc = -EROFS);
2263                 rc = ost_punch(req->rq_export, req, oti);
2264                 break;
2265         case OST_STATFS:
2266                 CDEBUG(D_INODE, "statfs\n");
2267                 req_capsule_set(&req->rq_pill, &RQF_OST_STATFS);
2268                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_STATFS_NET))
2269                         RETURN(0);
2270                 rc = ost_statfs(req);
2271                 break;
2272         case OST_SYNC:
2273                 CDEBUG(D_INODE, "sync\n");
2274                 req_capsule_set(&req->rq_pill, &RQF_OST_SYNC);
2275                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_SYNC_NET))
2276                         RETURN(0);
2277                 rc = ost_sync(req->rq_export, req, oti);
2278                 break;
2279         case OST_SET_INFO:
2280                 DEBUG_REQ(D_INODE, req, "set_info");
2281                 req_capsule_set(&req->rq_pill, &RQF_OBD_SET_INFO);
2282                 rc = ost_set_info(req->rq_export, req);
2283                 break;
2284         case OST_GET_INFO:
2285                 DEBUG_REQ(D_INODE, req, "get_info");
2286                 req_capsule_set(&req->rq_pill, &RQF_OST_GET_INFO_GENERIC);
2287                 rc = ost_get_info(req->rq_export, req);
2288                 break;
2289 #ifdef HAVE_QUOTA_SUPPORT
2290         case OST_QUOTACHECK:
2291                 CDEBUG(D_INODE, "quotacheck\n");
2292                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACHECK);
2293                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACHECK_NET))
2294                         RETURN(0);
2295                 rc = ost_handle_quotacheck(req);
2296                 break;
2297         case OST_QUOTACTL:
2298                 CDEBUG(D_INODE, "quotactl\n");
2299                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTACTL);
2300                 if (OBD_FAIL_CHECK(OBD_FAIL_OST_QUOTACTL_NET))
2301                         RETURN(0);
2302                 rc = ost_handle_quotactl(req);
2303                 break;
2304         case OST_QUOTA_ADJUST_QUNIT:
2305                 CDEBUG(D_INODE, "quota_adjust_qunit\n");
2306                 req_capsule_set(&req->rq_pill, &RQF_OST_QUOTA_ADJUST_QUNIT);
2307                 rc = ost_handle_quota_adjust_qunit(req);
2308                 break;
2309 #endif
2310         case OBD_PING:
2311                 DEBUG_REQ(D_INODE, req, "ping");
2312                 req_capsule_set(&req->rq_pill, &RQF_OBD_PING);
2313                 rc = target_handle_ping(req);
2314                 break;
2315         /* FIXME - just reply status */
2316         case LLOG_ORIGIN_CONNECT:
2317                 DEBUG_REQ(D_INODE, req, "log connect");
2318                 req_capsule_set(&req->rq_pill, &RQF_LLOG_ORIGIN_CONNECT);
2319                 rc = ost_llog_handle_connect(req->rq_export, req);
2320                 req->rq_status = rc;
2321                 rc = req_capsule_server_pack(&req->rq_pill);
2322                 if (rc)
2323                         RETURN(rc);
2324                 RETURN(ptlrpc_reply(req));
2325         case OBD_LOG_CANCEL:
2326                 CDEBUG(D_INODE, "log cancel\n");
2327                 req_capsule_set(&req->rq_pill, &RQF_LOG_CANCEL);
2328                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_NET))
2329                         RETURN(0);
2330                 rc = llog_origin_handle_cancel(req);
2331                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_LOG_CANCEL_REP))
2332                         RETURN(0);
2333                 req->rq_status = rc;
2334                 rc = req_capsule_server_pack(&req->rq_pill);
2335                 if (rc)
2336                         RETURN(rc);
2337                 RETURN(ptlrpc_reply(req));
2338         case LDLM_ENQUEUE:
2339                 CDEBUG(D_INODE, "enqueue\n");
2340                 req_capsule_set(&req->rq_pill, &RQF_LDLM_ENQUEUE);
2341                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE))
2342                         RETURN(0);
2343                 rc = ldlm_handle_enqueue(req, ldlm_server_completion_ast,
2344                                          ost_blocking_ast,
2345                                          ldlm_server_glimpse_ast);
2346                 fail = OBD_FAIL_OST_LDLM_REPLY_NET;
2347                 break;
2348         case LDLM_CONVERT:
2349                 CDEBUG(D_INODE, "convert\n");
2350                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CONVERT);
2351                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CONVERT))
2352                         RETURN(0);
2353                 rc = ldlm_handle_convert(req);
2354                 break;
2355         case LDLM_CANCEL:
2356                 CDEBUG(D_INODE, "cancel\n");
2357                 req_capsule_set(&req->rq_pill, &RQF_LDLM_CANCEL);
2358                 if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_CANCEL))
2359                         RETURN(0);
2360                 rc = ldlm_handle_cancel(req);
2361                 break;
2362         case LDLM_BL_CALLBACK:
2363         case LDLM_CP_CALLBACK:
2364                 CDEBUG(D_INODE, "callback\n");
2365                 CERROR("callbacks should not happen on OST\n");
2366                 /* fall through */
2367         default:
2368                 CERROR("Unexpected opcode %d\n",
2369                        lustre_msg_get_opc(req->rq_reqmsg));
2370                 req->rq_status = -ENOTSUPP;
2371                 rc = ptlrpc_error(req);
2372                 RETURN(rc);
2373         }
2374
2375         LASSERT(current->journal_info == NULL);
2376
2377         EXIT;
2378         /* If we're DISCONNECTing, the export_data is already freed */
2379         if (!rc && lustre_msg_get_opc(req->rq_reqmsg) != OST_DISCONNECT)
2380                 target_committed_to_req(req);
2381
2382 out:
2383         if (!rc)
2384                 oti_to_request(oti, req);
2385
2386         target_send_reply(req, rc, fail);
2387         return 0;
2388 }
2389 EXPORT_SYMBOL(ost_handle);
2390 /*
2391  * free per-thread pool created by ost_thread_init().
2392  */
2393 static void ost_thread_done(struct ptlrpc_thread *thread)
2394 {
2395         struct ost_thread_local_cache *tls; /* TLS stands for Thread-Local
2396                                              * Storage */
2397
2398         ENTRY;
2399
2400         LASSERT(thread != NULL);
2401
2402         /*
2403          * be prepared to handle partially-initialized pools (because this is
2404          * called from ost_thread_init() for cleanup.
2405          */
2406         tls = thread->t_data;
2407         if (tls != NULL) {
2408                 OBD_FREE_PTR(tls);
2409                 thread->t_data = NULL;
2410         }
2411         EXIT;
2412 }
2413
2414 /*
2415  * initialize per-thread page pool (bug 5137).
2416  */
2417 static int ost_thread_init(struct ptlrpc_thread *thread)
2418 {
2419         struct ost_thread_local_cache *tls;
2420
2421         ENTRY;
2422
2423         LASSERT(thread != NULL);
2424         LASSERT(thread->t_data == NULL);
2425
2426         OBD_ALLOC_PTR(tls);
2427         if (tls == NULL)
2428                 RETURN(-ENOMEM);
2429         thread->t_data = tls;
2430         RETURN(0);
2431 }
2432
2433 #define OST_WATCHDOG_TIMEOUT (obd_timeout * 1000)
2434
2435 static struct cfs_cpt_table     *ost_io_cptable;
2436
2437 /* Sigh - really, this is an OSS, the _server_, not the _target_ */
2438 static int ost_setup(struct obd_device *obd, struct lustre_cfg* lcfg)
2439 {
2440         static struct ptlrpc_service_conf       svc_conf;
2441         struct ost_obd *ost = &obd->u.ost;
2442         struct lprocfs_static_vars lvars;
2443         nodemask_t              *mask;
2444         int rc;
2445         ENTRY;
2446
2447         rc = cfs_cleanup_group_info();
2448         if (rc)
2449                 RETURN(rc);
2450
2451         lprocfs_ost_init_vars(&lvars);
2452         lprocfs_obd_setup(obd, lvars.obd_vars);
2453
2454         cfs_mutex_init(&ost->ost_health_mutex);
2455
2456         svc_conf = (typeof(svc_conf)) {
2457                 .psc_name               = LUSTRE_OSS_NAME,
2458                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
2459                 .psc_buf                = {
2460                         .bc_nbufs               = OST_NBUFS,
2461                         .bc_buf_size            = OST_BUFSIZE,
2462                         .bc_req_max_size        = OST_MAXREQSIZE,
2463                         .bc_rep_max_size        = OST_MAXREPSIZE,
2464                         .bc_req_portal          = OST_REQUEST_PORTAL,
2465                         .bc_rep_portal          = OSC_REPLY_PORTAL,
2466                 },
2467                 .psc_thr                = {
2468                         .tc_thr_name            = "ll_ost",
2469                         .tc_thr_factor          = OSS_THR_FACTOR,
2470                         .tc_nthrs_init          = OSS_NTHRS_INIT,
2471                         .tc_nthrs_base          = OSS_NTHRS_BASE,
2472                         .tc_nthrs_max           = OSS_NTHRS_MAX,
2473                         .tc_nthrs_user          = oss_num_threads,
2474                         .tc_cpu_affinity        = 1,
2475                         .tc_ctx_tags            = LCT_DT_THREAD,
2476                 },
2477                 .psc_cpt                = {
2478                         .cc_pattern             = oss_cpts,
2479                 },
2480                 .psc_ops                = {
2481                         .so_req_handler         = ost_handle,
2482                         .so_req_printer         = target_print_req,
2483                 },
2484         };
2485         ost->ost_service = ptlrpc_register_service(&svc_conf,
2486                                                    obd->obd_proc_entry);
2487         if (IS_ERR(ost->ost_service)) {
2488                 rc = PTR_ERR(ost->ost_service);
2489                 CERROR("failed to start service: %d\n", rc);
2490                 GOTO(out_lprocfs, rc);
2491         }
2492
2493         memset(&svc_conf, 0, sizeof(svc_conf));
2494         svc_conf = (typeof(svc_conf)) {
2495                 .psc_name               = "ost_create",
2496                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
2497                 .psc_buf                = {
2498                         .bc_nbufs               = OST_NBUFS,
2499                         .bc_buf_size            = OST_BUFSIZE,
2500                         .bc_req_max_size        = OST_MAXREQSIZE,
2501                         .bc_rep_max_size        = OST_MAXREPSIZE,
2502                         .bc_req_portal          = OST_CREATE_PORTAL,
2503                         .bc_rep_portal          = OSC_REPLY_PORTAL,
2504                 },
2505                 .psc_thr                = {
2506                         .tc_thr_name            = "ll_ost_create",
2507                         .tc_thr_factor          = OSS_CR_THR_FACTOR,
2508                         .tc_nthrs_init          = OSS_CR_NTHRS_INIT,
2509                         .tc_nthrs_base          = OSS_CR_NTHRS_BASE,
2510                         .tc_nthrs_max           = OSS_CR_NTHRS_MAX,
2511                         .tc_nthrs_user          = oss_num_create_threads,
2512                         .tc_cpu_affinity        = 1,
2513                         .tc_ctx_tags            = LCT_DT_THREAD,
2514                 },
2515                 .psc_cpt                = {
2516                         .cc_pattern             = oss_cpts,
2517                 },
2518                 .psc_ops                = {
2519                         .so_req_handler         = ost_handle,
2520                         .so_req_printer         = target_print_req,
2521                 },
2522         };
2523         ost->ost_create_service = ptlrpc_register_service(&svc_conf,
2524                                                           obd->obd_proc_entry);
2525         if (IS_ERR(ost->ost_create_service)) {
2526                 rc = PTR_ERR(ost->ost_create_service);
2527                 CERROR("failed to start OST create service: %d\n", rc);
2528                 GOTO(out_service, rc);
2529         }
2530
2531         mask = cfs_cpt_table->ctb_nodemask;
2532         /* event CPT feature is disabled in libcfs level by set partition
2533          * number to 1, we still want to set node affinity for io service */
2534         if (cfs_cpt_number(cfs_cpt_table) == 1 && nodes_weight(*mask) > 1) {
2535                 int     cpt = 0;
2536                 int     i;
2537
2538                 ost_io_cptable = cfs_cpt_table_alloc(nodes_weight(*mask));
2539                 for_each_node_mask(i, *mask) {
2540                         if (ost_io_cptable == NULL) {
2541                                 CWARN("OSS failed to create CPT table\n");
2542                                 break;
2543                         }
2544
2545                         rc = cfs_cpt_set_node(ost_io_cptable, cpt++, i);
2546                         if (!rc) {
2547                                 CWARN("OSS Failed to set node %d for"
2548                                       "IO CPT table\n", i);
2549                                 cfs_cpt_table_free(ost_io_cptable);
2550                                 ost_io_cptable = NULL;
2551                                 break;
2552                         }
2553                 }
2554         }
2555
2556         memset(&svc_conf, 0, sizeof(svc_conf));
2557         svc_conf = (typeof(svc_conf)) {
2558                 .psc_name               = "ost_io",
2559                 .psc_watchdog_factor    = OSS_SERVICE_WATCHDOG_FACTOR,
2560                 .psc_buf                = {
2561                         .bc_nbufs               = OST_NBUFS,
2562                         .bc_buf_size            = OST_BUFSIZE,
2563                         .bc_req_max_size        = OST_MAXREQSIZE,
2564                         .bc_rep_max_size        = OST_MAXREPSIZE,
2565                         .bc_req_portal          = OST_IO_PORTAL,
2566                         .bc_rep_portal          = OSC_REPLY_PORTAL,
2567                 },
2568                 .psc_thr                = {
2569                         .tc_thr_name            = "ll_ost_io",
2570                         .tc_thr_factor          = OSS_THR_FACTOR,
2571                         .tc_nthrs_init          = OSS_NTHRS_INIT,
2572                         .tc_nthrs_base          = OSS_NTHRS_BASE,
2573                         .tc_nthrs_max           = OSS_NTHRS_MAX,
2574                         .tc_nthrs_user          = oss_num_threads,
2575                         .tc_cpu_affinity        = 1,
2576                         .tc_ctx_tags            = LCT_DT_THREAD,
2577                 },
2578                 .psc_cpt                = {
2579                         .cc_cptable             = ost_io_cptable,
2580                         .cc_pattern             = ost_io_cptable == NULL ?
2581                                                   oss_io_cpts : NULL,
2582                 },
2583                 .psc_ops                = {
2584                         .so_thr_init            = ost_thread_init,
2585                         .so_thr_done            = ost_thread_done,
2586                         .so_req_handler         = ost_handle,
2587                         .so_hpreq_handler       = ost_hpreq_handler,
2588                         .so_req_printer         = target_print_req,
2589                 },
2590         };
2591         ost->ost_io_service = ptlrpc_register_service(&svc_conf,
2592                                                       obd->obd_proc_entry);
2593         if (IS_ERR(ost->ost_io_service)) {
2594                 rc = PTR_ERR(ost->ost_io_service);
2595                 CERROR("failed to start OST I/O service: %d\n", rc);
2596                 ost->ost_io_service = NULL;
2597                 GOTO(out_create, rc);
2598         }
2599
2600         ping_evictor_start();
2601
2602         RETURN(0);
2603
2604 out_create:
2605         ptlrpc_unregister_service(ost->ost_create_service);
2606         ost->ost_create_service = NULL;
2607 out_service:
2608         ptlrpc_unregister_service(ost->ost_service);
2609         ost->ost_service = NULL;
2610 out_lprocfs:
2611         lprocfs_obd_cleanup(obd);
2612         RETURN(rc);
2613 }
2614
2615 static int ost_cleanup(struct obd_device *obd)
2616 {
2617         struct ost_obd *ost = &obd->u.ost;
2618         int err = 0;
2619         ENTRY;
2620
2621         ping_evictor_stop();
2622
2623         /* there is no recovery for OST OBD, all recovery is controlled by
2624          * obdfilter OBD */
2625         LASSERT(obd->obd_recovering == 0);
2626         cfs_mutex_lock(&ost->ost_health_mutex);
2627         ptlrpc_unregister_service(ost->ost_service);
2628         ptlrpc_unregister_service(ost->ost_create_service);
2629         ptlrpc_unregister_service(ost->ost_io_service);
2630         ost->ost_service = NULL;
2631         ost->ost_create_service = NULL;
2632         ost->ost_io_service = NULL;
2633
2634         cfs_mutex_unlock(&ost->ost_health_mutex);
2635
2636         lprocfs_obd_cleanup(obd);
2637
2638         if (ost_io_cptable != NULL) {
2639                 cfs_cpt_table_free(ost_io_cptable);
2640                 ost_io_cptable = NULL;
2641         }
2642
2643         RETURN(err);
2644 }
2645
2646 static int ost_health_check(const struct lu_env *env, struct obd_device *obd)
2647 {
2648         struct ost_obd *ost = &obd->u.ost;
2649         int rc = 0;
2650
2651         cfs_mutex_lock(&ost->ost_health_mutex);
2652         rc |= ptlrpc_service_health_check(ost->ost_service);
2653         rc |= ptlrpc_service_health_check(ost->ost_create_service);
2654         rc |= ptlrpc_service_health_check(ost->ost_io_service);
2655         cfs_mutex_unlock(&ost->ost_health_mutex);
2656
2657         /*
2658          * health_check to return 0 on healthy
2659          * and 1 on unhealthy.
2660          */
2661         if( rc != 0)
2662                 rc = 1;
2663
2664         return rc;
2665 }
2666
2667 struct ost_thread_local_cache *ost_tls(struct ptlrpc_request *r)
2668 {
2669         return (struct ost_thread_local_cache *)(r->rq_svc_thread->t_data);
2670 }
2671
2672 /* use obd ops to offer management infrastructure */
2673 static struct obd_ops ost_obd_ops = {
2674         .o_owner        = THIS_MODULE,
2675         .o_setup        = ost_setup,
2676         .o_cleanup      = ost_cleanup,
2677         .o_health_check = ost_health_check,
2678 };
2679
2680
2681 static int __init ost_init(void)
2682 {
2683         struct lprocfs_static_vars lvars;
2684         int rc;
2685         ENTRY;
2686
2687         lprocfs_ost_init_vars(&lvars);
2688         rc = class_register_type(&ost_obd_ops, NULL, lvars.module_vars,
2689                                  LUSTRE_OSS_NAME, NULL);
2690
2691         if (ost_num_threads != 0 && oss_num_threads == 0) {
2692                 LCONSOLE_INFO("ost_num_threads module parameter is deprecated, "
2693                               "use oss_num_threads instead or unset both for "
2694                               "dynamic thread startup\n");
2695                 oss_num_threads = ost_num_threads;
2696         }
2697
2698         RETURN(rc);
2699 }
2700
2701 static void /*__exit*/ ost_exit(void)
2702 {
2703         class_unregister_type(LUSTRE_OSS_NAME);
2704 }
2705
2706 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
2707 MODULE_DESCRIPTION("Lustre Object Storage Target (OST) v0.01");
2708 MODULE_LICENSE("GPL");
2709
2710 module_init(ost_init);
2711 module_exit(ost_exit);