Enabling Fast Data using in memory-centric computing with Tachyon – Tachyon architecture



Enabling Fast Data using in memory-centric computing with Tachyon – Tachyon architecture

0 0


FastDataWithTachyon


On Github saveveltri / FastDataWithTachyon

Enabling Fast Data using in memory-centric computing with Tachyon

by Saverio Veltri / @save_veltri

Who am I?

Saverio Veltri

Fast Data: What And Why

  • WhatThe evolution of Big Data (Volume, Velocity, Variety, Veracity) with focus on streaming technologies capable of delivering, processing, analyzing and storing events as fast as they come in. In Real Time.
  • WhyFast data gives organizations the ability to test, experiment and fail forward faster, by cycling through these tests at a higher speed, in order to learn and succeed quicker

FAST DATA NON FUNCTIONAL REQUIREMENTS

  • DATA SOURCE AND DATA SYNC CONNECTORS Read, Write and Combine multiple heterogeneous data sources or data destination via a common interface.
  • BATCH AND PIPELINE PROCESSIONG Non continuous (non-realtime) processing of massive amount of data from start to completition
  • REAL TIME STREAMING Process AND Aggregate real time events, data streams or sensor data
  • OLAP PROCESSING Support for analytical/information processing involving complext queries and aggregations
  • IN-MEMORY PROCESSING Use memory for speed, staging and sharing data across jobs
  • DISTRIBUTED INTENSIVE I/O Support for fast, intensive and scalable read and writes

Why IN MEMORY PROCESSING

  • Pros
    • Cost / capacity of Memory is lower and lower, which makes it possible to handle huge size of data in memory
    • Throughput of RAM is increasing exponentially
    • Many computation frameworks (e.g. Spark and Flink) leverage memory
  • Cons
    • GC
    • Data sharing across frameworks

IN MEMORY PROCESSING: throughput

IN MEMORY PROCESSING: Cost

IN MEMORY PROCESSING: Known Problems

  • Garbage Collector:

    dealing with heap brings also Garbace collector which can slow jobs or even cause failures

  • Data sharing across frameworks:

    objects in heap must be serialized and deserialized somehow in order to be shared across frameworks

  • Fault Tolerance (replication ?)

  • What if data does not fit memory size?

Tachyon

“Tachyon is a memory-centric distributed storage system enabling reliable data sharing at memory-speed across cluster frameworks.”

AmpLab BDAS

“BDAS, the Berkeley Data Analytics Stack, is an open source software stack that integrates software components being built by the AMPLab to make sense of Big Data.”

AmpLab BDAS

Tachyon architecture

Zookeeper

  • Zookeeper is a frameword used to achieve fault tolerance; a leader node is elected every time cluster is started up.

Master

  • Manage the global metadata of the system
  • Clients endpoint for reading or modifying this metadata.
  • all workers periodically heartbeat to the master

Worker

  • manage local resources allocated to Tachyon.
  • store data as blocks and serve requests from clients to read or write data by reading or creating new blocks.

Client

  • provides users a gateway to interact with the Tachyon servers.
  • exposes a file system API (Java).
  • It initiates communication with master for metadata operations and with workers to read and write data.

Tachyon architecture

Storage types

  • Tachyon Storage
    • multi layer (tiered storage) for storing user files
    • may be volatile
    • accessible by schema tachyon://[path]
  • Underfs storage: 3rd party file system used for persisting tachyon journal, lineage and eventually for user files sync

underfs

Tachyon relys on some other storage system to safely store metadata and user data (if configured)Supported file system:

  • Local
  • HDFS
  • S3
  • Swift
  • GlusterFS
  • your own implementation:extends tachyon.underfs.UnderFileSystem

write types

  • MUST_CACHE: Write the file to Tachyon storage or failing the operation.
  • CACHE THROUGH: Write the file synchronously to the under fs, and also try to write Tachyon storage.
  • THROUGH: Write the file synchronously to the under fs, skipping Tachyon storage.
  • ASYNC_THROUGH: Write the file asynchronously to the under fs.

Tiered Storage

  • Tachyon manages other storage types in addition to memory, since its capacity may be limited in some deployments.
  • Tachyon automatically manages blocks between all the configured tiers

Tiered Storage

Tiers are fully configurable; here a typical scenario

Configuring tiered storage

tachyon.worker.tieredstore.level.max
tachyon.worker.tieredstore.level{x}.alias
tachyon.worker.tieredstore.level{x}.dirs.quota
tachyon.worker.tieredstore.level{x}.dirs.path
tachyon.worker.tieredstore.level{x}.reserved.ratio
tachyon.worker.tieredstore.level.max=2
tachyon.worker.tieredstore.level0.alias=MEM
tachyon.worker.tieredstore.level0.dirs.path=/mnt/ramdisk
tachyon.worker.tieredstore.level0.dirs.quota=100GB
tachyon.worker.tieredstore.level0.reserved.ratio=0.2
tachyon.worker.tieredstore.level1.alias=HDD
tachyon.worker.tieredstore.level1.dirs.path=/mnt/hdd1,/mnt/hdd2,/mnt/hdd3
tachyon.worker.tieredstore.level1.dirs.quota=2TB,5TB,500GB
tachyon.worker.tieredstore.level1.reserved.ratio=0.1

Writing Data

When a user writes a new block, it is written to the top tier by default (a custom allocator can be used if the default behavior is not desired). If there is not enough space for the block in the top tier, then the evictor is triggered in order to free space for the new block.

Reading Data

Tachyon will simply read the block from where it is already stored. If Tachyon is configured with multiple tiers, then the block will not be necessarily read from the top tier, since it could have been moved to a lower tier transparently.

Block Promotion

Reading data with TachyonStorageType.PROMOTE configuration will ensure the data is first transferred to the top tier before it is read from the worker.This can also be used as a data management strategy by explicitly moving hot data to higher tiers.

Allocators

Tachyon uses allocators for choosing locations for writing new blocks. Tachyon has a framework for customized allocators, but there are a few default implementations of allocators. Here are the existing allocators in Tachyon:

  • GreedyAllocator: Allocates the new block to the first storage directory that has sufficient space.
  • MaxFreeAllocator: Allocates the block in the storage directory with most free space.
  • RoundRobinAllocator Allocates the block in the highest tier with space, the storage directory is chosen through round robin.
  • Your own implementation
    implements tachyon.worker.block.allocator.Allocator
    

Evictors

Tachyon uses evictors for deciding which blocks to move to a lower tier, when space needs to be freed. Tachyon supports custom evictors, and implementations include:

  • GreedyEvictor: Evicts arbitrary blocks until the required size is freed.
  • LRUEvictor: Evicts the least-recently-used blocks until the required size is freed.
  • LRFUEvictor: Evicts blocks based on least-recently-used and least-frequently-used with a configurable weight. If the weight is completely biased toward least-recently-used, the behavior will be the same as the LRUEvictor.

  • Your own implementation implements tachyon.worker.block.evictor

    tachyon.worker.evictor.class=tachyon.worker.block.evictor.LRUEvictor
    
    - PartialLRUEvictor: Evicts based on least-recently-used but will choose StorageDir with maximum free space and only evict from that StorageDir.

Lineage

Tachyon can achieve high throughput writes and reads, without compromising fault-tolerance by using Lineage

  • lost output is recovered by re-executing the jobs that created the output.
  • applications write output into memory, and Tachyon periodically checkpoints the output into the under file system in an asynchronous way. In case of failures, Tachyon launches job recomputation to restore the lost files. Lineage assumes that jobs are deterministic.

lineage

Lineage API (alpha)

TachyonLineage tl = TachyonLineage.get();
// input file paths
TachyonURI input1 = new TachyonURI("/inputFile1");
TachyonURI input2 = new TachyonURI("/inputFile2");
List inputFiles = Lists.newArrayList(input1, input2);
// output file paths
TachyonURI output = new TachyonURI("/outputFile");
List outputFiles = Lists.newArrayList(output);
// command-line job
JobConf conf = new JobConf("/tmp/recompute.log");
CommandLineJob job = new CommandLineJob("my-spark-job.sh", conf);
long lineageId = tl.createLineage(inputFiles, outputFiles, job);

Transparent Naming

Transparent naming maintains an identity between the Tachyon namespace and the underlying storage system namespace.

Transparent naming

  • object paths are replicated in under fs.

  • Files in the underfs are transparently discovered even if they were not created through Tachyontheir metadata will be loaded in memory the first time they are accessed

Unified Namespace

- Tachyon provides a mounting API that makes it possible to use Tachyon to access data across multiple data sources.
bool mount(String tachyonPath, String ufsPath);
bool unmount(String tachyonPath);
					

Tachyon shell

  • provides basic file system operations (ls, rm, cat, mv, tail, touch etc.)
  • provides "special" operation
    • persist: persists data from Tachyon storage to the under fs
    • pin / unpin: marks / unmarsk a file or folder as pinned in Tachyon (i.e. no eviction will be applied)
    • load: load data from under storage into Tachyon storage
    • report: marks a file as lost to the Tachyon master. Marking a file as lost will cause the master to schedule a recomputation job to regenerate the file.

Remote Write support

$ java -cp  
SW/tachyon-0.8.2/assembly/target/tachyon-assemblies-0.8.2-jar-with-dependencies.jar 
-Dtachyon.master.hostname=backpressure-master 
tachyon.shell.TfsShell copyFromLocal icla.pdf /

ex. spark memory management

Execution engine

Storage engine

  • Temporary data and block manager are in the same spark job
  • GC involved in both of them
  • Share data between jobs using file system

ex. spark off_heap storage

Execution engine

Storage engine

  • storage engine stores data in a different process
  • GC involved only in execution engine
  • Share data between jobs using Tachyon at memory speed

ex. spark off_heap storage

						def persist(newLevel: StorageLevel): this.type
					
						class StorageLevel private(
private var _useDisk: Boolean,
private var _useMemory: Boolean,
private var _useOffHeap: Boolean,
private var _deserialized: Boolean,
private var _replication: Int = 1)
					
testRdd.persist(StorageLevel.OFF_HEAP)
					

demo

Q & A ?

THANK YOU!

1
Enabling Fast Data using in memory-centric computing with Tachyon by Saverio Veltri / @save_veltri