Contents
Changelog:
- (while still tentative) 12 November 2019: add description of persistent log to “your task”
- (while still tentative) 20 November 2019: correct references to makefile targets that run tests
- (while still tentative) 20 November 2019: correct skeleton code including extra methods in the .proto file that were unintended
- (while still tentative) 20 November 2019: supply an initial value to be stored of empty/None.
- (while still tentative) 20 November 2019: correct skeleton code to actual include
make submit
target - (while still tentative) 20 November 2019: describe test output a bit more
- 21 November 2019: in “starting server” instructions, specify using
python ./worker.py
(etc.) instead of./worker.py
- 21 November 2019: update .proto file to include RPC method in base code that we expect students to change
- 21 November 2019: include
python
inget_value.py
andset_value.py
example commands, and correctget_value
toget_value.py
- 22 November 2019: add instructions to run
make
to generate gRPC-related stubs both initially and aftertwophase.proto
is modified in several places - 22 November 2019: point explicitly to the virtualenv-based build instructions from the “Your task” section of the writeup
- 27 November 2019: correct
make submit
target in Makefile to actually generate .tar.gz file - 30 November 2019: create updated version of
test_util.py
that makes some failure tests less picky
The originally included version of
test_util.py
used bymake failure_test
sometimes checks the content returned from worker’s GetCommitted() methods when they worker indicates the value is unavailable, which it should not have done. You can get an updated version oftest_util.py
that does not do that here.
If you downloaded the skeleton code before 21 November 2019 7PM, then the skeleton .proto file either did not include the
SetValue
worker method used by our example base code (and might have included additional “Student added methods” that you don’t need to use). Either add theSetValue
method or get an updated version of the skeleton code.
Your Task
-
Download our template code (last updated very late 30 November 2019) that implements a remote procedure call-based service that stores a single string redundantly on multiple “workers”, using a single “coordinator” to update the common value.
Each worker and coordinator can be run as separate program, but our tests run them all in one process. We use the gRPC remote procedure call (RPC) library. Although our RPC library intended to operate remotely over IP (internet protocol)-based sockets (represented by an IP address and port number), our tests use “Unix domain” sockets, which are represented by a file.
The intention is that the value is only updated through the coordinator but can always be retrieved from any one of the workers, even if the others are inaccessible or down. But, we give priority to consistency over availability. This means that rather ever allowing one worker to report that the value is currently A and another report that it is currently B, we would rather that some workers instead indicate that the value is unavailable.
The Coordinator’s RPC interface has one method you must implement called
SetValue()
that updates stored the value on all workers (or returns an error if it fails), and the Worker’s RPC interface has one method you must implement calledGetCommitted()
that returns the current value if it’s available. (Initially, the value is empty/None
.) We describe these in more detail below. -
To simplify the assignment, all persistent data will be stored in a very simple persistent log. In a “real” database, most likely there would be separate updates to the log and to the actually stored data on disk on the workers. In your case, you will update a copy of the stored value in memory and write log entries that contain information about updates being performed (and the state of the worker/coordinators). Whenever a worker starts up, it will read its currently stored value from the last log entry into memory. While it works, it will update its value in memory and also update the log.
(If you don’t want to keep the stored value in memory yourself, you may also choose to always reread the log to figure it out, since our tests won’t be able to tell the difference.)
To further simplify this log, the log will only keep one entry. When you write a new log entry in this system, the prior log entry is discarded. This means that rather than scanning through all the log entries, you will need to rewrite any information you wanted to keep around in each new log entry. Most likely, this means that each worker log entry will contain a copy of the current stored value, even if it is not changing.
-
Build the template code using the instructions below and the supplied Makefile and run the tests with
make no_fail_test
andmake failure_test
(see the supplied tests section below for more detail).(Before running these tests, if you downloladed the template code on/before 30 November, updated your
test_util.py
to this versionI also recommend experimenting with running the code manually.
-
Our template code provides a naive implementation which does not provide consistency. This manifests in two ways:
-
while the value is being changed from
A
toB
, workers will not be consistent about what the value is. So, if there are two workers, it’s possible to observe value A on worker 1, then value B on worker 1, then value A on worker 2, then value B on worker 2. This is caused because the naive coordinator changes the value on worker 1 before changing the value on worker 2. This gives the observer the erroneous impression that, in addition to changing from A to B, the value changed from B to A and from A to B a second time. -
if a failure occurs, one or more workers may disagree on the value indefinitely. For example, if there are two workers, but worker 2 is temporarily inaccessible, changing the value from
A
toB
will return an error. But, after this error is returned worker 1 will indicate that the value isB
, but worker 2 will indicate that the value isA
.
Your job will be to modify the coordinator and worker code to fix these inconsistencies using two-phase commit.
Your implementation must:
-
Only use RPC calls to communicate between the coordinator and workers. We rely on injecting failures in these RPC calls to test your implementation. Our tests happen to run the workers and coordinator in a single process on a single machine, because this makes the tests much easier to write. But your code must work if we run them in seperate processes on separate machines.
-
Use the supplied
PersistentLog
implementation (described below ) to store any data that must be saved in the event that a worker or coordinator fails or is shut down.To inject failures, our tests use a PersistentLog subclass that sometimes throws an exception. You may not catch this exception.
-
Only run added code in response to RPC calls or when a coordinator or worker is constructed. This is unlike how a typical two-phase commit system would work. There, in the event of a failure, the system would typically retry an apparently failed operation after a timeout. To make testing easier, you should not use timeouts to trigger these “recovery” operations. Instead:
-
if your coordinator has problems contacting a worker in its
SetValue()
operation, it should cause theSetValue()
operation to fail immediately (e.g. by throwing an exception) rather than retrying to contact the worker.After this occurs, it is okay if some workers indicate the current value is unavailable. (However, all workers that return a stored value must return the same value. This could either by the original value (from before
SetValue()
was called) or the new value (supplied toSetValue()
) depending on your implementation and when the communication failure occured) -
whenever your coordinator is created, before returning from its constructor, it should communicate with all the workers to make the currently stored value available from all of them. If communicating with a worker fails during this, your constructor must throw an exception rather than retrying the commuication. (Our testing code will try to create your coordinator again.)
-
-
Not attempt to have workers communciate directly with each other (that is, not via remote procedure calls). In some two-phase commit systems, workers coordinate directly so they can figure out whether transactions should commit or abort even if a coordinator fails. This not required, and since our tests are not built to facilitate this, you must not do this.
-
-
If you downloaded the skeleton code before late 27 November 2019, add the line
SUBMIT_FILENAME=twophase-$(shell date +%Y%m%d%H%M%S).tar.gz
to your Makefile to make themake submit
target work (or replace it with this updated Makefile).Then run
make submit
to create a .tar.gz and upload it for submission.Alternately, make a
.tar.gz
manually.
Building
We recommend using python’s “virtual environments” feature to install the dependencies for this package locally rather than requiring them to be installed globally. In particular, this avoids problems with different programs requiring different versions of the libraries we use. We have supplied a script to assist with this:
-
If you are using a department machine like
portal.cs.virginia.edu
, runmodule load python
to enable a recent version of python. (The default is version 2.7.5, which is too old.) You will need to do this for each terminal (or configure your shell startup scripts to do it automatically). On our VM, this step is not necessary. On a different machine, ensure that you have at least Python version 3.6 installed. -
After downloading and extracting the skeleton code, run
./create-venv.sh
once. This will createbin
,lib
, and other directories, and install the following Python libraries and their dependencies within it:-
gRPC (via the grpcio and grpcio-tools packages)
-
parameterized (used for testing)
-
-
Then, each time you want to use a terminal to work on this project run
source bin/activate
. If this works, you will see(twophase)
before your terminal’s prompt. -
Run
make
to buiuld the various gRPC related files fromtwophase.proto
. Rerunmake
if you edittwophase.proto
.
Using manually
To use this system, you need to start one or more worker servers, then start a coordinator server, with the address of each of the worker servers. The coordinator will act as both a server — to receive commands to set values — and a client to each of the workers. The workers will act as a server for both the coordinator (for the commands to set values and, when you implement it, that are part of the two phase commit protocol) and for programs that query the current value.
Specifying addresses
The RPC system we use, gRPC, supports two types of server addresses. For testing we recommend primarily using the second type which only works locally:
-
TCP/IP-based addresses, which are composed of an IP address or hostname and a port. For example,
localhost:9999
(localhost
is the hostname, and9999
is the port number). For local testing, you can use the special hostnamelocalhost
or the special IP addresses127.0.0.1
or[::1]
to specify the local machine. If you use one of these special names or addresses when starting a server, the server will not be accessible from other machines. When starting a server, you can specify an address of0.0.0.0
to specify all IPv4 addresses the machine supports and[::]
to specify all IPv6 addresses the machine support. This will make it possible for clients to access the server using any address assigned to the machine (e.g. both localhost, from the same machine, and whatever address it is assigned from its WiFi connection, from any machine). -
“Unix domain” sockets, which are represented by files in the local filesystem. These are specified using something like
unix:filename
, to specify the filefilename
in the current directory orunix:///path/to/filename
to specify the file/path/to/filename
.Specifying a name like
unix:filename
when starting a server will createfilename
if it doesn’t exist, making it visible to commands likels
. Though it will appear as a file, it will not appear as a file you can open in something like a text editor.
Starting servers
To start a worker, you can use command like
python ./worker.py unix:first-worker-socket first-worker-log &
The first argument specifies where the worker’s RPC server will listen for calls; in this example, it will use a socket
file called first-worker-socket
in the current directory.
The second argument specifies the log file where the worker’s log will be stored. It will be created if it does not
exist. Just after it is created, that the corresponding PersistentLog
object paseed to the worker will return None
from get_last_log()
.
&
says to run the command in the background; you can omit, but then you won’t be able to easily run other
commands in the same terminal while the worker is running.
To start a coordinator, you can use a command like:
python ./coordinator.py unix:coordinator-socket coordinator-log unix:first-worker-socket unix:second-worker-socket
The first arugment specifies where the coordinator’s RPC server will listen for calls; in this case, a socket
file called coordinator-socket
.
The second argument specifies the log file where the coordinator’s log will be stored. It will be created if it does not
exist. Just after it is created, that the corresponding PersistentLog
object paseed to the worker will return None
from get_last_log()
.
The remaining arguments specify how to connect to the workers; the number of arguments supplied must correspond to the number of workers you want to use (which can be as few as 1). Code we supply will create stub objects for each of these workers before starting the coordinator.
Sending commands to servers
After starting servers, we supply two utilty programs for sending commands:
set_value.py
Running a command like:
python ./set_value.py unix:coordinator-socket SomeValue
will call the SetValue() method on the coordinator server specified by the first argument with a content
string of SomeValue
. If the coordinator returns an error, it will crash with a message about an exception
being thrown.
If you get an error from the coordinator containing text like
Exception calling application: 'WorkerStub' object has no attribute 'SetValue'
then this indicates that you have an old version oftwophase.proto
which was missing the declaration of theSetValue
RPC method included in our base code. (See note at top of this writeup.)
get_value.py
Running a command like:
python ./get_value.py unix:first-worker-socket
will call the GetCommitted() method on the worker specified by the first arugment and display the result.
If the worker’s reutrn value indicates the value is unavailable, it wil print value is UNAVAILABLE
. If it
indicates the value is available is equal to the string SomeValue, it wil print
value is AVAIALBLE and SomeValue`.
If the worker returns an error from the call, it will crash with a message about an exception being thrown.
Supplied Tests
We have supplied several tests based on Python’s built-in unittest library. We supply makefile targets that run each of the tests, or you can run Python directly using similar commands. As run in the Makefile, the tests will stop at the first failure, and if they print no messages about failures, all the tests passed.
Each of these tests runs the coordinator and one or more workers using Unix-domain sockets located in a temporary directory. These tests supply a PersistentLog object to the coordinator and workers which is stored in memory rather than in a file on disk.
To test failures, the tests run servers that acts as a proxy between the coordinator and workers. This server takes a remote procedure call intended for a worker and does one of the following:
-
forwards the call made by the coordinator to the worker and then forwards the result back to the coordinator
-
forwards the call made by the coordinator to the worker but discards the result, returning a failure to the coordinator
-
discards the call made by the coordinator and returns a failure to the coordinator
-
records the call made by the coordinator, returns a failure to the coordinator, then makes that call the worker some time later and discards the result
The PersistentLog object we provide also supports injecting failures by throwing an exception.
no_fail_tests
This file contains tests where there are no injected failures (but the worker and coordinator are restarted to ensure that the persistent log is in use).
The primary thing these tests try to check is that when the stored value is being changed
from A
to B
,
To do this, these tests that repeatedly checks whether all workers agree on the current value while the value is being changed. To ensure that the value is consistent at all times, we intercept messages between the coordinator and workers. Before and after the coordinator makes an RPC call to any worker, we ask the worker what their current values are to make sure they are consistent.
Since we do this check in response to each message you send, when this check fails, it may appear as if it’s part of sending that message failing.
In the naive implementation, when the value is changing from A to B, some workers start reporting the new value B while other workers are still reporting the new value A. You must fix the implementation that when a worker starts reporting the new value B, all other workers either report that the same value or that the value is unavailable.
failure_test
There is an updated version of
test_util.py
released on 30 November 2019 that fixes a bug where these tests were excessively sensitive. You can download that here.
This tests a variety of circumstances involving injected failures. Most tests are parameterized to vary things like the number of workers and when the injected failure occurs. (Our intention is that these tests should find many bugs, but they are definitely not exhaustive. Most notably the tests only try so many scenarios and if you use more or fewer messages, you might need to inject failures differently.)
Files in the distribution
-
twophase.proto
— this is the IDL (interface description language) for the RPC system, which is based on protobufs (the serialization format, which handles converting messages to and from bytes which can be sent across the network) and gRPC (which handles dispatching the appropriate functions, errors, etc.).This is used to generate files
twophase_pb2.py
andtwophase_pb2_grpc.py
if you run themake
command.This file has two types of declarations:
-
messages — which represent structures which can be sent in an RPC message. The messages have fields. The IDL compiler generates accessors for each of these fields. For example, given the declaration:
message MaybeValue { bool available = 1; string content = 2; }
One can create a MaybeValue message with its available field set to true and its content string set to
"Hello, World!"
using code like:twophase_pb2.MaybeValue(available = True, content = "Hello, World!")
The numbers
1
and2
and declaration are used to identify the field in the binary form of the message instead of the name. If you add new fields to a message, you must use a different number (but different messages can have different numbers). -
services — these represents objects with methods which can be called via RPC (remote procedure calls). Each method takes a message as its argument and returns a message as its result.
For each service a class with the same name is generated, which contains several other classes. For example, our Coordinator service has the following declaration:
service Coordinator { rpc SetValue(MaybeValue) returns (Empty) {} }
this results in the following Python classes existing in
twophase_pb2_grpc
:-
CoordinatorStub
— used by clients to call methods on a remote Coordinator service -
CoordinatorServicer
— base class for servers providing a Coordinator service
as well as a function
add_CoordinatorServicer_to_server
that can be used to configure a CoordinatorServicer instance to run as part of a RPC server (as in our supplied skeleton code incoordinator.py
). -
-
-
persistent_log.py
defines our PersistentLog base class and two implementations, one for testing (that stores the log in memory and supports injecting failures) and one for actual use (that stores the log in a file on disk).The PersistentLog objects provide two methods:
-
set_last_entry(entry)
— storesentry
, which can be almost any Python object as the last log entry before returning. -
get_last_entry()
— returns the last stored entry in the log.
If a coordinator or worker is restarted with the same log, the last entry will persist from whatever was set. If a crash occurs while the log entry is being changed, either the previous last entry or the new last entry will be returned, depending on exactly when the crash occured. (There should be no possibility of an in-between state.)
-
-
coordinator.py
has a function calledcreate_coordinator
which takes an instance of a PersistentLog (frompersistent_log.py
) and a list of worker stubs to use and returns an incompletely setup gRPC server object for the Coordinator service. Code that calls this function should take the server object, configure it to listen to a particular port or location, and then start it. To communicate with this service, the code can create a CoordinatorStub object and call methods like SetValue() using that stub.Our reference implementation of
create_coordinator
creates an instance of aMyCoordinator
class to implement the RPC service. -
worker.py
has a function calledcreate_worker
which takes an instance of a PersistentLog and creates an incompletely setup gRPC server object for the Worker service. Like with the coordinator, callers should configure and start this server. To communicate with the worker, code creates a WorkerStub object.
Hints
Using gRPC
-
To create or modify a method in an RPC service, you need to
- add or modify the method to the
twophase.proto
file for the service - add or modify the method to the corresponding class in
coordinator.py
andworker.py
. It should always take exactly three arguments:self
(the service object),request
(the argument to the method, which is amessage
declared intwophase.proto
), andcontext
, which provides access to utility functions for the RPC system, such as to send back errors
and then run
make
to regeneratetwophase_pb2.py
andtwophase_pb2_grpc.py
based ontwophase.proto
. - add or modify the method to the
-
To return an error from an RPC method, you can use code like
context.abort(grpc.StatusCode.INTERNAL, 'message')
where
grpc.StatusCode.INTERNAL
is a status code taken from the list here, and'message'
is a message of your choice. In the client calling the RPC service, this error will turn into an Python exception. -
When an RPC method fails, grpc throws an exception that inherits from
grpc.RpcError
. You could catch this exception to handle it explicitly, but in my reference implementaiton, I do not do this. (I just rely on the exception “crashing” my coordinator, and assume that the coordinator will be restarted to recover from this.)
Understanding gRPC errors
-
If an exception occurs during a method in a service, then gRPC will catch the exception and return an error from the method, with information about the exception embedded in the error. When the client receives this error, this will result in another exception. Since sometimes our tests will call RPC methods which call other methods, this can lead to rather long errors. For example, in my reference implementation if I add
raise Exception("THIS IS THE ACTUAL PROBLEM")
to a worker method called by the coordinator, then I get an stream of error messages like like:ERROR:grpc._server:Exception calling application: THIS IS THE ACTUAL PROBLEM Traceback (most recent call last): File "/net/zf14/cr4bd/fall2019/cs4414/hw/2p-py/lib/python3.6/site-packages/grpc/_server.py", line 434, in _call_behavior response_or_iterator = behavior(argument, context) File "/u/cr4bd/fall2019/cs4414/hw/2p-py/worker.py", line 53, in Commit raise Exception("THIS IS THE ACTUAL PROBLEM") Exception: THIS IS THE ACTUAL PROBLEM
This is part of the error message is written from the worker just before the RPC library sends back the error to the caller.
ERROR:grpc._server:Exception calling application: <_Rendezvous of RPC that terminated with: status = StatusCode.UNKNOWN details = "Exception calling application: THIS IS THE ACTUAL PROBLEM" debug_error_string = "{"created":"@1572716887.779957640","description":"Error received from peer unix:/tmp/2ppytphwv79d/worker-0","file":"src/core/lib/surface/call.cc","file_line":1052,"grpc_message":"Exception calling application: THIS IS THE ACTUAL PROBLEM","grpc_status":2}" > Traceback (most recent call last):
… (omitted several lines) …
When the error is received by the place where it was called from the coordinator, it triggers an exception. This is a printout of that exception. Note that in the debug_error_string the original exception mechanism is included along with information about the RPC call that was made and the part of the RPC library which handled it.
ERROR:grpc._server:Exception calling application: <_Rendezvous of RPC that terminated with: status = StatusCode.UNKNOWN details = "Exception calling application: <_Rendezvous of RPC that terminated with: status = StatusCode.UNKNOWN details = "Exception calling application: THIS IS THE ACTUAL PROBLEM" debug_error_string = "{"created":"@1572716887.779957640","description":"Error received from peer unix:/tmp/2ppytphwv79d/worker-0","file":"src/core/lib/surface/call.cc","file_line":1052,"grpc_message":"Exception calling application: THIS IS THE ACTUAL PROBLEM","grpc_status":2}" >"
(… omitted more lines …) grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with: status = StatusCode.UNKNOWN details = “Exception calling application: <_Rendezvous of RPC that terminated with: status = StatusCode.UNKNOWN details = “Exception calling application: THIS IS THE ACTUAL PROBLEM” debug_error_string = “{“created”:”@1572716887.779957640”,”description”:”Error received from peer unix:/tmp/2ppytphwv79d/worker-0”,”file”:”src/core/lib/surface/call.cc”,”file_line”:1052,”grpc_message”:”Exception calling application: THIS IS THE ACTUAL PROBLEM”,”grpc_status”:2}” >” debug_error_string = “{“created”:”@1572716887.781029247”,”description”:”Error received from peer unix:/tmp/2ppytphwv79d/worker-forward-0”,”file”:”src/core/lib/surface/call.cc”,”file_line”:1052,”grpc_message”:”Exception calling application: <_Rendezvous of RPC that terminated with:\n\tstatus = StatusCode.UNKNOWN\n\tdetails = “Exception calling application: THIS IS THE ACTUAL PROBLEM”\n\tdebug_error_string = “{“created”:”@1572716887.779957640”,”description”:”Error received from peer unix:/tmp/2ppytphwv79d/worker-0”,”file”:”src/core/lib/surface/call.cc”,”file_line”:1052,”grpc_message”:”Exception calling application: THIS IS THE ACTUAL PROBLEM”,”grpc_status”:2}”\n>”,”grpc_status”:2}” >
Since I did not catch the exception in the coordinator, it also triggered an exception in the the program that called the coordinator. As a result we got a third and fourth, even longer message.
Note that you probably want to look to make sure you look for the first error or at least one of the early ones to diagnose what is going on.
Note that in some of our tests, we deliberately inject communication errors from gRPC. Error messages triggered by these injected errors are likely normal, so you should not conclude that seeing error messages based on this is abnormal without further investigation.
The persistent log
-
You should use the persistent log to save the state of the coordinator and each worker. This includes both the value they are storing and what stage they are in the middle of the transaction.
On the workers, in order to return the correct value from
GetCommitted
, you will need to store the actual string stored. On the coordinator, in order to get a worker which failed to receive a transaction starting message, you will likely need to store the actual string as well information about any transaction is in progress.Updates to the peristent log are atomic. After updating the log, you will either see the previous or current version of the string.
Adding messages
-
You can add new messages types to
twophase.proto
using similar syntax to theMaybeValue
declared in our skeleton code:message MaybeValue { bool available = 1; string content = 2; }
Each field in the message needs to be assigned a unique number (and this number identifies that field when messages are sent over the network, not its name). Each field can have types like
bool
,string
,int32
,float
, etc. You can see the full list of types in the protocol buffer documentation along with more complete reference on the supported syntax for messages. -
You can add or replace the methods of the worker in
twophase.proto
that the coordinator uses to communicate with the worker. Our skeleton code uses aSetValue()
method that takes the new value as an argument, but this provides the worker too little information to implement its part of the distributed transaction.
Transaction IDs and Sequence Numbers
- It is possible for the coordinator to try to send that message to the worker and for the message to appear not be sent. When this happens, it is possible that the message does not actually reach the worker, or the message does reach the worker immediately, or the message reaches the worker sometime later. (Our tests create the situation deliberately. In a real network, it could likely happen as a result of some component (perhaps between the coordinator and worker machine) trying to resend a message after a failure.