Changeset 63910


Ignore:
Timestamp:
Jul 12, 2010, 2:38:14 AM (8 years ago)
Author:
bobwalters
Message:

Windows direct and OS-buffered ofile with directory synchronization.
Refined filewriteperf test

Location:
sandbox/transaction
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • sandbox/transaction/boost/transact/detail/buffering_file.hpp

    r62338 r63910  
    1818namespace detail{
    1919
     20template<std::size_t Capacity, bool direct_io>
     21struct ofile_buffer {
     22        static const bool direct = direct_io;
     23        char data[Capacity];
     24};
     25
     26#ifdef _WIN32
     27// Direct I/O supported on Windows given that there are only two ways
     28// to achieve synchronized sequaltial disk I/O - flush the system I/O buffers
     29// or use Windows direct I/O.  Not supported on any other OS.  Although O_DIRECT
     30// supported on many, strong
     31template<std::size_t Capacity>
     32struct ofile_buffer<Capacity,true> {
     33        static const bool direct = true;
     34        char *data;
     35
     36        ofile_buffer() {
     37                // direct I/O requires pagesize alignment.  This assumes
     38                // Capacity is > pagesize, but not necessarilly a multiple of it.
     39                int alignment=1; // largest power of 2 >= Capacity
     40                for (std::size_t i=Capacity; (i>>=1); alignment<<=1 )
     41                        ;
     42                data = (char*)_aligned_malloc(Capacity, alignment);
     43        };
     44        ~ofile_buffer() {
     45                _aligned_free(data);
     46        };
     47};
     48#endif
     49
     50
    2051template<class Base,std::size_t Capacity>
    2152class buffering_seq_ofile{
    2253public:
    2354    typedef typename Base::size_type size_type;
     55        static const bool direct_io = Base::has_direct_io;
     56       
    2457    explicit buffering_seq_ofile(std::string const &name)
    2558        : base(name)
    26         , size(0){
    27 #ifdef _WIN32
    28                 // support possibility that Base is using unbuffered I/O
    29                 // requiring specific buffer memory alignment
    30                 int alignment=1; // largest power of 2 >= Capacity
    31                 for (std::size_t i=Capacity; (i>>=1); alignment<<=1 )
    32                 buffer = (char*)_aligned_malloc(Capacity, alignment);
    33 #endif
    34         }
     59        , size(0)
     60                { }
    3561    template<class Size>
    3662    void write(void const *data,Size s){
    3763        if(this->size + s <= Capacity){
    38             std::memcpy(this->buffer+this->size,data,s);
     64            std::memcpy(this->buffer.data+this->size,data,s);
    3965            this->size+=s;
    4066        }else this->write_overflow(data,s);
     
    5985#endif
    6086        }
    61 #ifdef _WIN32
    62                 _aligned_free(buffer);
    63 #endif
    6487    }
    6588private:
    6689    void write_overflow(void const *data,std::size_t s){
    6790        BOOST_ASSERT(this->size + s > Capacity);
    68         if(this->size == 0){
     91                if (direct_io) {
     92                        while (this->size + s > Capacity) {
     93                                std::size_t write=Capacity - this->size;
     94                                std::memcpy(this->buffer.data+this->size,data,write);
     95                                this->size=Capacity;
     96                                this->flush_buffer();
     97                                data = static_cast<char const *>(data)+write;
     98                                s-=write;
     99                        }
     100                        if (s) {
     101                                this->write(data,s);
     102                        }
     103                }else if(this->size == 0){
    69104            this->base.write(data,s);
    70105        }else{
    71106            std::size_t write=Capacity - this->size;
    72             std::memcpy(this->buffer+this->size,data,write);
     107            std::memcpy(this->buffer.data+this->size,data,write);
    73108            this->size=Capacity;
    74109            this->flush_buffer();
     
    78113    void flush_buffer(){
    79114        if(this->size > 0){
    80             this->base.write(this->buffer,this->size);
     115            this->base.write(this->buffer.data,this->size);
    81116            this->size=0;
    82117        }
     
    84119
    85120    Base base;
    86 #ifdef _WIN32
    87         char *buffer;
    88 #else
    89         char buffer[Capacity];
    90 #endif
     121        ofile_buffer<Capacity,direct_io> buffer;
    91122        std::size_t size;
    92123};
  • sandbox/transaction/boost/transact/detail/file.hpp

    r62338 r63910  
    2020#include <Windows.h>
    2121#include <WinBase.h>
     22
     23#include <strsafe.h>
     24
     25static void throw_io_failure(char const* function)
     26{
     27        // Retrieve the system error message for the last-error code
     28        DWORD dw = GetLastError();
     29
     30        LPVOID lpMsgBuf;
     31        FormatMessage(  FORMAT_MESSAGE_ALLOCATE_BUFFER |
     32                                        FORMAT_MESSAGE_FROM_SYSTEM |
     33                                        FORMAT_MESSAGE_IGNORE_INSERTS,
     34                                        NULL,
     35                                        dw,
     36                                        MAKELANGID(LANG_NEUTRAL, SUBLANG_DEFAULT),
     37                                        (LPTSTR) &lpMsgBuf,
     38                                        0, NULL );
     39       
     40        // Display the error message and exit the process
     41        // TODO - should be incorporated into io_failure what().
     42        std::cerr << function << " failed with error " << dw << ": " << (char*)lpMsgBuf << std::endl;
     43
     44        LocalFree(lpMsgBuf);
     45        throw io_failure();
     46}       
    2247       
    2348// low-level ofile representation for WIN32
     49template <bool direct_io = false>
    2450class ofile {
    2551public:
    2652        typedef unsigned int size_type;
    27 
     53        static const bool has_direct_io = direct_io;
     54       
    2855        void* filedes;
    2956       
     
    3158                unsigned long access = GENERIC_READ | GENERIC_WRITE;
    3259                unsigned long creation_flags = OPEN_ALWAYS;
    33                 unsigned long flags = FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH;
     60                unsigned long flags = 0;
     61                if ( direct_io )
     62                        flags |= FILE_FLAG_NO_BUFFERING | FILE_FLAG_WRITE_THROUGH;
     63
    3464                this->filedes = CreateFileA(name.c_str(), access,
     65                                                                        FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
     66                                                                        0, creation_flags, flags, 0);
     67                if (this->filedes == INVALID_HANDLE_VALUE )
     68                        throw_io_failure("CreateFileA");
     69               
     70                //make sure the directory entry has reached the disk:
     71                std::string dirname=filesystem::system_complete(name).parent_path().external_directory_string();
     72
     73                creation_flags = OPEN_EXISTING;
     74                flags = FILE_FLAG_BACKUP_SEMANTICS;
     75                void *dirfiledes = CreateFileA(dirname.c_str(), access,
    3576                                                                                FILE_SHARE_READ | FILE_SHARE_WRITE | FILE_SHARE_DELETE,
    3677                                                                                0, creation_flags, flags, 0);
    37                 if (this->filedes == INVALID_HANDLE_VALUE ) {
    38                         throw io_failure();
    39                 }
    40                 std::cerr << "File opened" << std::endl;
     78                if (dirfiledes == INVALID_HANDLE_VALUE )
     79                        throw_io_failure("CreateFileA");
     80                if(!FlushFileBuffers(dirfiledes))
     81                        throw_io_failure("FlushFileBuffers");
     82                if(!CloseHandle(dirfiledes))
     83                        throw_io_failure("CloseHandle");
    4184        }
    4285       
    4386        ~ofile() {
    44                 if(this->filedes != INVALID_HANDLE_VALUE) CloseHandle(this->filedes);
     87                if(this->filedes != INVALID_HANDLE_VALUE)
     88                        CloseHandle(this->filedes);
    4589        }
    4690       
     
    4892                LARGE_INTEGER loc;
    4993                loc.QuadPart = s;
    50                 if(SetFilePointerEx(this->filedes, loc, NULL, FILE_BEGIN) == 0) {
    51                         std::cerr << "SetFilePointerEx == 0" << std::endl;
    52                         throw io_failure();
    53                 }
     94                if(SetFilePointerEx(this->filedes, loc, NULL, FILE_BEGIN) == 0)
     95                        throw_io_failure("SetFilePointerEx");
    5496        }
    5597       
    5698        size_type write(const char *data, size_type const &size) {
    5799                DWORD written;
    58                 if(WriteFile(this->filedes, data, size, &written, 0) == 0) {
    59                         std::cerr << "WriteFile == 0" << std::endl;
    60                         throw io_failure();
    61                 }
     100                if(WriteFile(this->filedes, data, size, &written, 0) == 0)
     101                        throw_io_failure("WriteFile");
    62102                return (size_type)written;
    63103        }
    64104       
    65         void sync() { }
     105        void sync() {
     106                if (!direct_io && FlushFileBuffers(this->filedes) == 0)
     107                        throw_io_failure("FlushFileBuffers");
     108        }
    66109};
    67110
     
    70113#include <unistd.h>
    71114#include <fcntl.h>
    72        
     115
    73116#ifndef _POSIX_SYNCHRONIZED_IO
    74117#error no POSIX synchronized IO available
     
    76119
    77120// low-level ofile for Linux/Unix
     121template <bool direct_io = false> // ignored on Posix API.
    78122class ofile {
    79123public:
    80124        typedef unsigned int size_type;
     125        static const bool has_direct_io = direct_io;
    81126
    82127        int filedes;
     
    89134                this->filedes= open(name.c_str(),flags,S_IRUSR | S_IWUSR);
    90135                if(this->filedes==-1) throw io_failure();
     136               
    91137                { //make sure the directory entry has reached the disk:
    92138                        std::string dirname=filesystem::path(name).directory_string();
     
    122168
    123169#endif
    124                
     170
     171typedef ofile<true> direct_ofile;
     172       
    125173}
    126174}
  • sandbox/transaction/boost/transact/detail/syncing_file.hpp

    r62338 r63910  
    1616#include <boost/assert.hpp>
    1717#include <boost/config.hpp>
    18 #include <boost/transact/detail/file.hpp>
     18#include <boost/transact/detail/buffering_file.hpp>
    1919
    2020namespace boost{
     
    2222namespace detail{
    2323
     24
     25template <class Base>
    2426class syncing_seq_ofile{
    2527public:
    2628    typedef unsigned int size_type;
    27     explicit syncing_seq_ofile(std::string const &name);
    28     void write(void const *data,std::size_t size);
     29        static const bool has_direct_io = Base::has_direct_io;
     30       
     31    explicit syncing_seq_ofile(std::string const &name)
     32                : pos(0)
     33                , base(name){
     34                        this->write_ahead(0,write_ahead_size);
     35                }
     36    void write(void const *data,std::size_t size){
     37                size_type const s=this->pos % write_ahead_size;
     38                if(s + size >= write_ahead_size){ //there must be at least one 0 at the and, so also write ahead if this is equal.
     39                        size_type start=this->pos - s + write_ahead_size;
     40                        size_type end=start+((s + size)/write_ahead_size) * write_ahead_size; //usually == start + write_ahead_size, but "size" can theoretically span a whole write_ahead_size
     41                        BOOST_ASSERT(end > start);
     42                        this->write_ahead(start,end);
     43                }
     44               
     45                std::size_t ret= base.write((char const *)data,size);
     46                if(ret > 0) this->pos+=ret;
     47                if(ret != std::size_t(size)) throw io_failure();
     48        }
    2949    size_type position() const{ return this->pos; }
    30     void flush();
    31     void sync();
     50    void flush() {}
     51    void sync() {
     52                base.sync();
     53        }
    3254private:
    3355    size_type pos;
    34         ofile  filedes;
     56        Base base;
    3557
    36 private:
    3758    void write_ahead(size_type const &start,size_type const &end){
    3859        BOOST_ASSERT(start % write_ahead_size == 0);
    3960        BOOST_ASSERT(end % write_ahead_size == 0);
    4061        BOOST_STATIC_ASSERT(write_ahead_size % page_size == 0);
    41         filedes.seek(start);
     62        base.seek(start);
    4263        for(size_type off=start;off < end;off+=page_size){
    43             filedes.write(empty_page.data,page_size);
     64            base.write(empty_page.data,page_size);
    4465        }
    45                 filedes.sync();
    46                 filedes.seek(this->pos);
     66                base.sync();
     67                base.seek(this->pos);
    4768    }
    4869
     
    5071    static std::size_t const page_size=4096;
    5172
    52     struct empty_page_type{
    53         empty_page_size(){
    54             std::memset(data,0,page_size);
     73    struct empty_page_type : public ofile_buffer<page_size,has_direct_io> {
     74                typedef ofile_buffer<page_size,has_direct_io> base_buffer;
     75        empty_page_type() : base_buffer() {
     76            std::memset(base_buffer::data,0,page_size);
    5577        }
    56         char data[page_size];
    57     }
     78    };
     79       
    5880    static empty_page_type empty_page;
    59     int filedes;
    6081};
    6182
    62 syncing_seq_ofile::empty_page_type syncing_seq_ofile::empty_page;
    63 
    64 inline syncing_seq_ofile::syncing_seq_ofile(std::string const &name)
    65     : pos(0)
    66     , filedes(name){
    67     this->write_ahead(0,write_ahead_size);
    68 }
    69 
    70 void syncing_seq_ofile::write(void const *data,std::size_t size){
    71     size_type const s=this->pos % write_ahead_size;
    72     if(s + size >= write_ahead_size){ //there must be at least one 0 at the and, so also write ahead if this is equal.
    73         size_type start=this->pos - s + write_ahead_size;
    74         size_type end=start+((s + size)/write_ahead_size) * write_ahead_size; //usually == start + write_ahead_size, but "size" can theoretically span a whole write_ahead_size
    75         BOOST_ASSERT(end > start);
    76         this->write_ahead(start,end);
    77     }
    78 
    79         std::size_t ret= filedes.write((char const *)data,size);
    80     if(ret > 0) this->pos+=ret;
    81     if(ret != std::size_t(size)) throw io_failure();
    82 }
    83 
    84 
    85 inline void syncing_seq_ofile::flush(){}
    86        
    87 inline void syncing_seq_ofile::sync(){
    88         filedes.sync();
    89 }
     83template<class Base>
     84typename syncing_seq_ofile<Base>::empty_page_type syncing_seq_ofile<Base>::empty_page;
    9085
    9186
  • sandbox/transaction/libs/transact/perf/filewrite.cpp

    r62168 r63910  
    1515#include <boost/transact/detail/sectorizing_file.hpp>
    1616#include <boost/transact/detail/syncing_file.hpp>
     17#include <boost/transact/detail/file.hpp>
     18#include <boost/date_time/posix_time/posix_time.hpp>
    1719
    18 static const int num_sets=100;
    19 static const int txns_per_set=1000;
    20 static const int txn_size=3000;  // typical transaction log size
    21 static int total_txns = 0;
    22 static int total_bytes = 0;
     20static const int loop_size=1000;
    2321
    24 static char txn_buffer[ txn_size ]; // random data set
     22static char txn_buffer[ 10000 ]; // random data set
    2523
    2624using namespace boost::transact;
     25using namespace boost::posix_time;
    2726
    2827typedef detail::sectorizing_seq_ofile<
    2928                        detail::aligning_seq_ofile<
    3029                                detail::buffering_seq_ofile<
    31                                         detail::syncing_seq_ofile,
     30                                        detail::syncing_seq_ofile< detail::ofile<false> >,
    3231                                        8192
    3332                                >
     
    3736
    3837
    39 void log_a_set(ofile_t &outfile) {
    40         for (int i=0; i<txns_per_set; i++) {
     38typedef detail::sectorizing_seq_ofile<
     39                        detail::aligning_seq_ofile<
     40                                detail::buffering_seq_ofile<
     41                                        detail::syncing_seq_ofile< detail::direct_ofile >,
     42                                        8192
     43                                >
     44                        >
     45                >
     46direct_ofile_t;
     47
     48template <class file_t>
     49void filetest1(const char *filename, size_t txn_size) {
     50        file_t outfile(filename);       
     51       
     52        ptime start = microsec_clock::local_time();     
     53        for (int i=0; i<loop_size; i++) {
    4154                outfile.write(txn_buffer, txn_size);
     55                outfile.sync();
    4256        }
    43         total_txns += txns_per_set;
    44         total_bytes += (txns_per_set * txn_size);
    45         std::cout << "Written " << total_txns << " txns, " << total_bytes << " bytes" << std::endl;
     57        ptime end = microsec_clock::local_time();
     58
     59        std::cout << "Written " << loop_size << " txns, "
     60                        << loop_size*txn_size << " bytes, in "
     61                        << (end-start) << " microseconds"
     62                        << std::endl;
    4663}
    4764
    48 void filetest1() {
    49         ofile_t outfile("filetest1.out");
    50         for (int i=0; i<num_sets; i++) {
    51                 log_a_set(outfile);
    52         }
    53 }
    5465
    5566int main(int, const char *[]){
    56     filetest1();
     67        // write loop_size transactions to disk, each 3k in size.
     68    filetest1<ofile_t>("filetest1.out", 3000);
     69    filetest1<direct_ofile_t>("filetest2.out", 3000);
     70
     71        // write loop_size transactions to disk, each 10k in size.
     72        filetest1<ofile_t>("filetest3.out", 10000);
     73    filetest1<direct_ofile_t>("filetest4.out", 10000);
     74
    5775    return 0;
    5876}
  • sandbox/transaction/libs/transact/test/Jamfile.v2

    r62168 r63910  
    5252    alias filewriteperf
    5353    :
    54         [ run ../perf/filewrite.cpp :  :  : <library>/boost//date_time <library>/boost//system <link>static ]
     54        [ run ../perf/filewrite.cpp :  :  : <library>/boost//filesystem <library>/boost//date_time <library>/boost//system <link>static ]
    5555    ;
    5656       
Note: See TracChangeset for help on using the changeset viewer.