This chapter describes setting up and operating Virtuoso on a cluster of computers. The section on Virtuoso cluster programming documents the SQL extensions specific to cluster application development.
These sections apply to Virtuoso as of version 6.0.
Clustering primarily offers greatly increased scalability for large databases without requiring application changes. The database is divided over a number of servers, of which all provide transparent access to the same data.
Virtuoso can be run in cluster mode where one logical database is served by a collection of server processes spread over a cluster of machines.
The cluster's composition is declared in a cluster.ini file which is to be in the starting directory of each of the servers composing the cluster. This file declares the hosts and listening ports of all processes composing the cluster and which of these processes is the local process and which the master.
A cluster has a single master process which is the only one allowed to run DDL operations and which is responsible for distributed deadlock resolution. In all other respects, all server processes of the cluster are interchangeable.
The set of processes declared in the cluster.ini files is called the physical cluster.
Each cluster server process has its own database and log files and is solely responsible for these. All configuration fields in virtuoso.ini and related files apply to the process whose ini file this is and their meaning is not modified by clustering.
Specifically, the SQL client and HTTP and other listening ports of each process are declared as usual and are used as usual. A cluster server process has additionally a cluster listening port that is used for cluster communications. This may not be connected to by anything except other processes of the same physical cluster. The cluster listener ports of all processes are declared in cluster.ini and all processes must specify the same information.
The below is a sample cluster.ini file declaring a physical cluster of 4 processes.
[Cluster] Threads = 100 ThisHost = Host1 Master = Host1 ReqBatchSize = 100 BatchesPerRPC = 4 BatchBufferBytes = 20000 LocalOnly = 2 Host1 = box1:2222 Host2 = box2:2223 Host3 = box3:2224 Host4 = box4:2225 Host1-1 = box1-1:12222 Host2-1 = box2-1:12223 Host3-1 = box3-1:12224 Host4-1 = box4-1:12225
The lines Host1 ... Host4 declare the listening ports of each process. The line ThisHost = 1 declares that this process is Host1, hence cluster listener at box1:2222 box1 - box4 and box1-1 - box4-1 are machine names that must be resolvable in the local context. IP numbers can also be used. Mentioning a host several times declares additional interfaces for the host. Any of these interfaces may be used for cluster connection to the Virtuoso server at the host. Thus Host1 = givens the first interface, Host1-1 the second and so on. This is useful since servers most often have multiple network interfaces and Virtuoso balances the traffic among these interfaces if multiple interfaces are provided. Each host will listen at all the host:port numbers mentioned and other hosts will decide which interface to use based on load.
The Threads line gives the maximum number of threads that will be made for serving requests from other hosts of the cluster. This is in addition to any other threads reserved in any other ini files.
The Threads line gives the maximum number of threads that will be made for serving requests from other hosts of the cluster. This is in addition to any other threads reserved in any other ini files.
The other fields should be left at the values shown.
To make a new empty clustered Virtuoso database, set up the individual instances. The processes must be of exactly the same version, the operating systems, byte orders or word lengths of the participating machines do not have to match.
Each individual database is assigned its database files and other configuration by editing its virtuoso.ini file. No installation besides having the Virtuoso executable and the virtuoso.ini file is needed.
If there is a Virtuoso installation on the machine, use the executable from that and set up an empty directory with the virtuoso.ini file.
Once the individual database directories are set up, write the cluster.ini file as shown above. Copy this into each running directory beside the virtuoso.ini file. Edit each to specify which host the file belongs to. Set ThisHost to point to the host which the this file belongs. Set Master to point to one of the hosts and make sure each file has the same master and a distinct ThisHost.
Start the server processes. This will initialize the empty databases.
Connect to the master host's SQL port with a SQL client to continue. The initial default user and password is dba. To verify that the cluster nodes can connect to each other, do status (''); twice. The second status should have a line beginning with Cluster xx nodes, ...
The procedure for converting a single server database into a clustered one will be specified later. For 6.0, there is no automatic way of doing this.
One can take an existing database and convert it to clustered operation by copying the initial database to each node of the cluster. Set up the database so that each node would run an identical copy. Then make the cluster.ini files.
Start the servers. Connect to the master and run the partitioning statements for all keys of all tables that are to be managed by the cluster.
At this point, each database will also hold rows that are not its responsibility, unless of course all tables are declared as replicated. The rows that are present and do not fall in the partition managed by the host should be deleted.
This can be done with a stored procedure to be supplied later
All databases in a cluster share precisely the same schema. Any DDL operations take effect on all nodes simultaneously.
The tables can be of one of three types:
Partitioning an index means that different hosts store different parts of the index. For each partitioned index one or more partitioning columns must be declared. Also each partitioned index is always held in its totality in a logical cluster. A logical cluster is a subset of the machines composing the physical cluster declared in cluster.ini. Usually the logical and physical cluster are the same.
The logical cluster additionally declares how partitions are to be replicated. It is namely possible to declare that a specific partition be stored in multiple identical copies.
There are two predefined logical clusters: REPLICATED and __ALL. If a table's indices specify the REPLICATED cluster in their partitioning declaration, the data will be maintained in identical copies on all nodes.
The __ALL cluster is the default logical cluster for any partitioned table. Using this, each row will go to exactly one place, balanced over the set of nodes declared in the cluster.ini file.
Basic applications do not need to declare their own cluster since the default one is most often applicable.
CREATE CLUSTER <name> DEFAULT <group>[ [,...] group: GROUP (<host>[,...])
A logical cluster has a single global name and it consists of one or more host groups. Each host group is given a partition of whatever object that is stored in the logical cluster. Each host of a host group replicates the partition assigned to this host group.
One logical cluster is predefined. It is called replicated and it consists of one group which has all the hosts of the physical cluster. The replicated logical cluster is used for storing any schema tables. This causes all schema information to be identically stored on all nodes of the physical cluster.
If a table is created on a clustered Virtuoso and no partitioning is declared, the table exists on all nodes with independent content on each. This is generally not desirable since the same query will return different data depending on which host runs it.
For performance, it is best to replicate any short, seldom changing lookup tables on all hosts.
Example
create cluster C2 default group ("Host1"), group ("Host2"), group ("Host3"), group ("Host4") ;
This would declare a logical cluster identical to the default __ALL cluster if the cluster.ini specified hosts 1 - 4.
The REPLICATED cluster could be declared as follows:
create cluster C3 default group ("Host1", "Host2", "Host3", Host4");
The ALTER INDEX statement is used for declaring the partitioning of an index. For a non-primary key index, the corresponding CREATE INDEX can also declare the partitioning. If one index of a table is partitioned, all indices of the table must be partitioned. If no partitioning is declared, the table will exist on all nodes but will have independent content on each. Partitioning of a key must be set when the key is empty. Thus, to create a partitioned table, first create the table and declare partitioning for its primary key.
The name of the primary key index is the same as that of the table. If the table has no explicit primary key, it has an implicit one, named after the table and having the invisible _IDN column as single key part. This may be used as a partitioning key.
If clustering is not enabled, partitioning can still be declared but it will have no effect. Thus a single application DDL file can be used for clustered and single process versions of the application schema.
ALTER INDEX <index-name> ON <table-name> PARTITION [CLUSTER <cluster-name>] (<col-spec>[,...]) col-spec : <column-name> <type> [<options>] type: INT | VARCHAR options: (<mask>) | (<length>, <mask>)
The PARTITION declaration may occur at the end of a create index statement. This causes the index to be created, partitioned and then filled. Otherwise it would not be possible to add indices to non-empty tables.
All or part of a partitioning column's value can be used for calculating a index entry hash which then determines which host group of the logical cluster gets to store the index entry. There are two types of hashing, integer and varchar. Integer applies to integer like types such as integer and bigint and iri id and varchar applies to anything else. Floating point columns or decimals should not be used for partitioning. Large objects or UDT's cannot be used for partitioning.
For an integer partitioning, the mask is a bitmask applied to the number before extracting the part that is used for the hash. A mask of 0hexffff00 will use the second and third least significant bytes for hashing, thus values 0-255 will hash to the same, values 256-512 to the same and so on. The value 0hex1000000 will again hash to the same as 0.
Having consecutive integers hash to the same will cause them to go to the same host group and become physically adjacently which is good for key compression. If no mask is specified 0hexffff is used, meaning that each consecutive number gets a different hash, based on the low 16 bits of the number.
For a varchar partitioned column, the default is to calculate a hash based on all bytes of the string. For purposes of key compression, it may be good to put strings with a common prefix in the same partition.
The option consists of two integers, the length and the mask. If the length is positive, the length first characters are used for the hash. If the string is shorter than length, all characters are used. If the length is negative, we take the absolute value of the length and use all bytes of the string except the length last ones. If the string is shorter than -length, all the bytes are used. A length of -1 means to use all bytes except the last one, a length of 2 means to use the 2 first characters only.
The string's hash value is a large integer. The mask controls how many bits of this hash are used for the hash of the index entry.
Example
create table part (id int, code int, str varchar); alter index part on part partition (id int (0hexffff00)); create index str on part (str) partition (str varchar (-1)); -- for the primary key, hash all values differing in the low byte together. -- for str, hash all values differing only in the last character together. create table part_code (code int primary key, description varchar); alter index part_code on part_code cluster replicated;
This declares a lookup table for describing the values of the code column of the part table. This is replicated on all nodes of the cluster. Note that no partitioning columns need be specified since no matter the partition key the row would end up on all nodes regardless.
A Virtuoso cluster is fully transactional and supports the 4 isolation levels identically with a single server Virtuoso. Transactions are committed using single to two phase commit as may be appropriate and this is transparent to the application program.
Distributed deadlocks are detected and one of the deadlocking transactions is killed, just as with a single process.
Transactions are logged on the cluster nodes which perform updates pertaining to the transaction.
A transaction has a single owner connection. Each client connection has a distinct transaction. From the application program's viewpoint there is a single thread per transaction. Any parallelization of queries is transparent.
For roll forward recovery, each node is independent. If a transaction is found in the log for which a prepare was received but no final commit or rollback, the recovering node will ask the owner of the transaction whether the transaction did commit. Virtuoso server processes can provide this information during roll forward, hence a simultaneous restart of cluster nodes will not deadlock.
A lock wait in a clustered database requires an asynchronous notification to a monitor node. This is done so that a distributed deadlock can be detected. Thus the overhead of waiting is slightly larger than with a single process.
We recommend that read committed be set as the default isolation since this avoids most waiting. A read committed transaction will show the last committed state of rows that have exclusive locks and uncommitted state. This is set as DefaultIsolation = 2.
In the parameters section of each virtuoso.ini file.
Virtuoso has a mode where insert/update/delete statements commit after each row. This is called row autocommit mode and is useful for bulk operations that need no transactional semantic.
The row autocommit mode is set by executing log_enable (2) or log_enable (3), for no logging and logging respectively. The setting stays in effect until set again or for the duration of the connection. Do not confuse this with the autocommit mode of SQL client connection.
In a clustered database the row autocommit mode is supported but it will commit at longer intervals in order to save on message latency. Statements are guaranteed to commit at least once, at the end of the statement.
A searched update or delete statement in row autocommit mode processes a few thousand keys between commits, all in a distributed transaction with 2PC. These are liable to deadlock. Since the transaction boundary is not precisely defined for the application, a row autocommit batch update must be such that one can distinguish between updated and non-updated if one must restart after a deadlock. This is of course not an issue if updating several times makes no difference to the application.
Naturally, since a row can be deleted only once, the problem does not occur with deletes. Both updates and deletes in row autocommit mode are guaranteed to keep row integrity, i.e. all index entries of one row will be in the same transaction.
Naturally, since a row can be deleted only once, the problem does not occur with deletes. Both updates and deletes in row autocommit mode are guaranteed to keep row integrity, i.e. all index entries of one row will be in the same transaction.
A row autocommit insert sends all keys of the row at once and each commit independently. Hence, a checkpoint may for example cause a situation where one index of a row is in the checkpoint state and the other is not.
Thus, a row autocommit insert on a non-empty application table with transactional semantic is not recommended. This will be useful for bulk loads into empty tables and the like, though.
All administrative operations other than data definition take effect on the node to which they are issued.
cl_exec (in cmd varchar, in params any := NULL, in is_txn := 0)
The cl_exec SQL function can be used for executing things on all nodes of a cluster.
The cmd is a SQL string. If it contains parameter markers (?), the params array is used for assigning values, left to right. If is_txn is 1, the cl_exec makes a distributed transaction and does not automatically commit on locally on each node. Thus cl_exec can be used as part of a containing distributed transaction.
Example
cl_exec ('shutdown')
--will shut all nodes.
cl_exec ('dbg_obj_print (?)', vector ('hello'));
--will print hello to the standard output of all the processes of the cluster.
Any recovery, integrity checking, crash dump or similar can be done node by node as with single processes.
The status () function has a cluster line right below the database version information. This line shows cluster event counts and statistics between the present and previous calls to status. Calling status ('cluster') will show this line only. Calling status ('cluster_d') shows this line and below it the same line with data on each individual host in the cluster.
If cluster nodes are off-line, the nodes concerned are mentioned above the cluster status line.
The line consists of the following fields.
Cluster 4 nodes, 4 s. 9360 m/s 536 KB/s 117% cpu 0% read 44% clw threads 2r 0w 1i buffers 1939 766 d 0 w 0 pfs Cluster 4 nodes, 4 s. 9360 m/s 536 KB/s
This first group gives the network status. The count of nodes online (4), the measurement interval, number of seconds since the last status command (4 seconds). The m/s is the messages per second, i.e. 9360 single messages sent for intra-cluster purposes per second over the last 4s. The KB/s is the aggregate throughput, i.e. the count of bytes sent divided by the length of the measure,measurement interval. This allows calculating an average message length. Only intra-cluster traffic is counted, SQL client server and HTTP connections are not included.
117% cpu 0% read 44% clw threads 2r 0w 1i
This group gives the process status. The CPU% is at 100% if one thread is running at one full CPU. The maximum CPU% is 100 times the number of CPU's in the cluster. Differences between CPU's are not considered. The read % is the sum of real time spent waiting for disk divided by the time elapsed. The maximum number is 100 times the peak number of threads running during the interval. 500% would mean an average of five threads waiting for disk times during the interval. The clw% is the sum of real time a thread has waited for cluster request responses during the period. The maximum is 100% times the peak number of threads running.
The threads section (2r 0w 1i) is a snapshot of thread state and means that 2 threads are involved with processing, 0 of these is waiting for a lock and 1 is waiting for network I/O.
buffers 1939 766 d 0 w 0 pfs
This is a snapshot of the database buffers summed over all nodes. 1939 used for disk caching, 766 dirty 0 wired down.
The pfs number is the total number of page faults during the interval summed over the cluster. This provides a warning about swapping and should be 0 or close at ll times.
|
Previous
HTML based Administration Console (Conductor) Guide |
Chapter Contents |
Next
Virtuoso Cluster Fault Tolerance |