Distributed Systems (Dağıtık Sistemler): Teoriden Pratiğe
Bir yazılım sistemi büyüdükçe, tek bir sunucunun kapasitesi yetmemeye başlar. İşte tam bu noktada dağıtık sistemler devreye girer. Birden fazla bilgisayarın (node) ağ üzerinden iletişim kurarak tek bir sistem gibi çalışmasına dağıtık sistem diyoruz.
Bu yazıda, dağıtık sistemlerin temel kavramlarını, karşılaşılan zorlukları ve çözüm tekniklerini detaylıca inceleyeceğiz.
1. Dağıtık Sistem Nedir?
Dağıtık sistem, birden fazla bağımsız bilgisayarın (node) ağ üzerinden iletişim kurarak, kullanıcıya tek bir tutarlı sistem gibi görünen yapıdır.
Temel Özellikler
- Concurrency (Eşzamanlılık): Node'lar aynı anda çalışır
- No Global Clock: Her node'un kendi saati vardır
- Independent Failures: Node'lar bağımsız olarak hata verebilir
- Heterogeneity: Farklı donanım/yazılımlar kullanılabilir
Örnek Dağıtık Sistemler
- Google Arama (binlerce sunucuda indeksleme)
- Amazon (mikroservis mimarisi)
- Bitcoin (blockchain)
- Netflix (içerik dağıtım ağı - CDN)
2. Dağıtık Sistemlerin Temel Zorlukları
2.1 Kısmi Hatalar (Partial Failures)
En büyük zorluklardan biri: Bir node hata verirken diğerleri çalışmaya devam eder.
[Sistem]
├── [Node 1] ✓ Çalışıyor
├── [Node 2] ✗ Çöktü (ama ağ hala çalışıyor)
├── [Node 3] ✓ Çalışıyor
└── [Node 4] ? Cevap vermiyor (çöktü mü? ağ mı sorunlu?)
2.2 Zaman ve Sıralama Sorunları
Farklı makinelerde farklı saatler olabilir:
# Node 1 saati: 10:00:00.100
# Node 2 saati: 09:59:59.900 (100ms geri)
# Hangi olay önce oldu?
event1 = "Node 1'de kullanıcı kaydoldu" # 10:00:00.150
event2 = "Node 2'de email gönderildi" # Node 2 saati 10:00:00.050
# (aslında 09:59:59.950) - event1'den ÖNCE mi oldu?
2.3 Ağ Sorunları (Network Problems)
Ağ her zaman güvenilir değildir:
- Packet loss: Paket kaybı
- Latency: Gecikme
- Network partition: Ağ bölünmesi (birbirini görmeyen iki grup)
3. CAP Teoremi (Brewer's Theorem)
Dağıtık sistemlerin en temel teoremi: Üç özellikten sadece ikisi aynanda sağlanabilir.
| Özellik | Açıklama |
|---|---|
| Consistency (Tutarlılık) | Her okuma en son yazmayı görür |
| Availability (Erişilebilirlik) | Her istek cevap alır (başarısız olsa bile) |
| Partition Tolerance (Bölünme Toleransı) | Ağ bölünmelerine rağmen sistem çalışır |
CAP Kombinasyonları
CAP Teoremi
/|\
/ | \
/ | \
/ | \
CP | AP
(Bankacılık) (Sosyal Medya)
Tutarlılık Erişilebilirlik
+ +
Bölünme Bölünme
Toleransı Toleransı
(CA pratikte imkansız!)
Örnekler
# CP System (Consistency + Partition Tolerance) - Bankacılık
class BankAccount:
def transfer(self, amount):
# Her zaman tutarlı olmalı
with distributed_lock(self.account_id):
if self.balance >= amount:
self.balance -= amount
return True
return False
# Ağ bölünmesinde hata verir (available değil)
# AP System (Availability + Partition Tolerance) - Sosyal Medya
class SocialMediaPost:
def like(self):
# Tutarlılık önemli değil, her zaman cevap ver
self.likes_count += 1 # Eventual consistency
return "Liked!"
# Farklı node'larda farklı like sayıları olabilir (geçici)
4. Consistency Modelleri (Tutarlılık Modelleri)
4.1 Strong Consistency (Güçlü Tutarlılık)
Tüm node'lar aynı veriyi aynı anda görür.
# Raft veya Paxos gibi consensus algoritmaları ile
def write_with_consensus(key, value):
# Tüm node'ların çoğunluğundan onay al
majority = len(nodes) // 2 + 1
acks = 0
for node in nodes:
try:
node.write(key, value)
acks += 1
if acks >= majority:
return "OK"
except:
continue
raise Exception("Consensus failed")
4.2 Eventual Consistency (Nihai Tutarlılık)
Bir süre sonra tüm node'lar aynı veriyi görür.
# Dynamo-style (Amazon DynamoDB)
class EventuallyConsistentDB:
def write(self, key, value):
# Tüm node'lara yaz, bekleme
for node in self.nodes:
self.queue.put((node, key, value))
return "OK (async)"
def read(self, key):
# Tüm node'lardan oku, en güncel timestamp'i al
results = []
for node in self.nodes:
results.append(node.read(key))
# En yüksek timestamp'li değeri döndür
return max(results, key=lambda x: x.timestamp)
4.3 Read-Your-Writes Consistency
Kullanıcı kendi yazdığını hemen okur.
class SessionConsistency:
def __init__(self):
self.session_writes = {}
def write(self, session_id, key, value):
# Yazma işlemi
node = self.route_to_node(key)
node.write(key, value)
# Session'a hangi node'a yazdığımızı kaydet
if session_id not in self.session_writes:
self.session_writes[session_id] = {}
self.session_writes[session_id][key] = node
def read(self, session_id, key):
# Önce session'ın yazdığı node'u kontrol et
if session_id in self.session_writes and key in self.session_writes[session_id]:
# Aynı node'dan oku
node = self.session_writes[session_id][key]
return node.read(key)
else:
# Load balance yap
node = self.route_to_node(key)
return node.read(key)
5. Consensus Algoritmaları (Uzlaşma Algoritmaları)
Node'ların aynı karara varmasını sağlayan algoritmalar.
5.1 Paxos (Klasik)
# Basitleştirilmiş Paxos
class PaxosNode:
def __init__(self, node_id):
self.node_id = node_id
self.proposed_value = None
self.accepted_value = None
self.promised_round = 0
def prepare(self, round_num):
# Prepare phase: Daha yüksek round var mı?
if round_num > self.promised_round:
self.promised_round = round_num
return ("PROMISE", self.accepted_value)
return ("REJECT", None)
def accept(self, round_num, value):
# Accept phase: Promise verilmiş mi?
if round_num >= self.promised_round:
self.accepted_value = value
return "ACCEPTED"
return "REJECTED"
# Kullanım
# 1. Prepare: Çoğunluktan PROMISE al
# 2. Accept: Değeri gönder, çoğunluktan ACCEPTED al
5.2 Raft (Anlaşılması Kolay)
Raft, üç ana bileşenden oluşur:
- Leader Election: Lider seçimi
- Log Replication: Log çoğaltma
- Safety: Güvenlik
import enum
import time
import random
class NodeState(enum.Enum):
FOLLOWER = 1
CANDIDATE = 2
LEADER = 3
class RaftNode:
def __init__(self, node_id, nodes):
self.node_id = node_id
self.nodes = nodes # Diğer node'lar
self.state = NodeState.FOLLOWER
self.current_term = 0
self.voted_for = None
self.log = []
self.commit_index = 0
self.last_applied = 0
# Election timeout: 150-300ms random
self.election_timeout = random.uniform(0.15, 0.3)
self.last_heartbeat = time.time()
def tick(self):
# Her tick'te çalışır
if self.state == NodeState.LEADER:
# Lider herkese heartbeat gönder
self.send_heartbeats()
else:
# Follower: election timeout kontrolü
if time.time() - self.last_heartbeat > self.election_timeout:
self.start_election()
def start_election(self):
self.state = NodeState.CANDIDATE
self.current_term += 1
self.voted_for = self.node_id
# Diğer node'lardan oy iste
votes = 1 # Kendi oyu
for node in self.nodes:
if node.request_vote(self.current_term, self.node_id):
votes += 1
# Çoğunluk oyu aldıysa lider ol
if votes > len(self.nodes) // 2:
self.state = NodeState.LEADER
print(f"Node {self.node_id} lider oldu (term: {self.current_term})")
def request_vote(self, term, candidate_id):
# Oy verme mantığı
if term > self.current_term:
self.current_term = term
self.state = NodeState.FOLLOWER
self.voted_for = None
if (term == self.current_term and
(self.voted_for is None or self.voted_for == candidate_id)):
self.voted_for = candidate_id
self.last_heartbeat = time.time() # Election timeout'u sıfırla
return True
return False
6. Distributed Transactions (Dağıtık Transaction'lar)
Birden fazla node'u içeren transaction'lar.
6.1 Two-Phase Commit (2PC)
class TwoPhaseCommit:
def __init__(self, participants):
self.participants = participants # Katılımcı node'lar
def execute_transaction(self, transaction):
# Phase 1: Prepare
print("Phase 1: PREPARE")
all_ready = True
for participant in self.participants:
try:
participant.prepare(transaction)
print(f" {participant.id}: READY")
except Exception as e:
print(f" {participant.id}: ABORT ({e})")
all_ready = False
# Phase 2: Commit veya Abort
if all_ready:
print("Phase 2: COMMIT")
for participant in self.participants:
participant.commit(transaction)
else:
print("Phase 2: ABORT")
for participant in self.participants:
participant.abort(transaction)
# Sorun: Coordinator çökerse ne olur?
# Çözüm: Three-Phase Commit (3PC)
6.2 Saga Pattern (Uzun Yaşayan Transaction'lar)
class SagaOrchestrator:
def __init__(self):
self.steps = []
self.compensations = []
def add_step(self, action, compensation):
self.steps.append(action)
self.compensations.append(compensation)
def execute(self):
completed = []
try:
for i, step in enumerate(self.steps):
print(f"Executing step {i}")
step()
completed.append(i)
print("Saga completed successfully!")
except Exception as e:
print(f"Error at step {len(completed)}: {e}")
print("Starting compensation...")
# Ters sırada compensation çalıştır
for i in reversed(completed):
self.compensations[i]()
print("Compensation completed")
# Kullanım: Sipariş - Ödeme - Stok
saga = SagaOrchestrator()
saga.add_step(
lambda: create_order(),
lambda: cancel_order()
)
saga.add_step(
lambda: process_payment(),
lambda: refund_payment()
)
saga.add_step(
lambda: reserve_stock(),
lambda: release_stock()
)
saga.execute()
7. Distributed Locking (Dağıtık Kilit)
7.1 Redis ile Distributed Lock (Redlock)
import redis
import time
import uuid
class RedisDistributedLock:
def __init__(self, redis_client, lock_name, ttl=10):
self.redis = redis_client
self.lock_name = f"lock:{lock_name}"
self.lock_value = str(uuid.uuid4())
self.ttl = ttl
def acquire(self, blocking=True, timeout=10):
start_time = time.time()
while True:
# SET NX: Sadece yoksa set et
# PX: Expire süresi
acquired = self.redis.set(
self.lock_name,
self.lock_value,
nx=True,
px=self.ttl * 1000
)
if acquired:
return True
if not blocking:
return False
if timeout and (time.time() - start_time) > timeout:
return False
time.sleep(0.1)
def release(self):
# Lua script ile atomik release
# Sadece lock'u alan thread bırakabilir
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
self.redis.eval(
lua_script,
1,
self.lock_name,
self.lock_value
)
def __enter__(self):
self.acquire()
return self
def __exit__(self, *args):
self.release()
# Kullanım
redis_client = redis.Redis(host='localhost', port=6379)
with RedisDistributedLock(redis_client, "resource-1"):
# Kritik bölge
# Sadece bir node burada olabilir
update_database()
7.2 ZooKeeper ile Distributed Lock
from kazoo.client import KazooClient
from kazoo.recipe.lock import Lock
class ZooKeeperDistributedLock:
def __init__(self, zk_hosts, lock_path):
self.zk = KazooClient(hosts=zk_hosts)
self.zk.start()
self.lock = Lock(self.zk, lock_path)
def execute_with_lock(self, func, *args, **kwargs):
with self.lock:
# Kritik bölge
return func(*args, **kwargs)
def __del__(self):
self.zk.stop()
# Kullanım
zk_lock = ZooKeeperDistributedLock('localhost:2181', '/myapp/locks/resource1')
def critical_operation(user_id):
print(f"User {user_id} in critical section")
time.sleep(2)
return "OK"
# 10 thread farklı user'lar için çalışır
# Ama critical_operation aynı anda sadece 1 kere çalışır
8. Distributed Caching (Dağıtık Önbellek)
8.1 Consistent Hashing
import hashlib
import bisect
class ConsistentHashRing:
def __init__(self, nodes=None, replicas=3):
self.replicas = replicas # Her node için sanal node sayısı
self.ring = {}
self.sorted_keys = []
if nodes:
for node in nodes:
self.add_node(node)
def _hash(self, key):
return int(hashlib.md5(key.encode()).hexdigest(), 16)
def add_node(self, node):
for i in range(self.replicas):
virtual_key = f"{node}:{i}"
hash_key = self._hash(virtual_key)
self.ring[hash_key] = node
bisect.insort(self.sorted_keys, hash_key)
def remove_node(self, node):
for i in range(self.replicas):
virtual_key = f"{node}:{i}"
hash_key = self._hash(virtual_key)
del self.ring[hash_key]
self.sorted_keys.remove(hash_key)
def get_node(self, key):
if not self.ring:
return None
hash_key = self._hash(key)
# Binary search: hash_key'den büyük ilk key'i bul
idx = bisect.bisect(self.sorted_keys, hash_key)
if idx == len(self.sorted_keys):
idx = 0 # Wrap around
return self.ring[self.sorted_keys[idx]]
# Kullanım
cache_nodes = ['cache1', 'cache2', 'cache3']
ring = ConsistentHashRing(cache_nodes, replicas=100)
# Bir key hangi node'a gitmeli?
node = ring.get_node('user:12345')
print(f"user:12345 -> {node}")
# Node ekle/çıkar
ring.remove_node('cache2')
# Sadece 1/3 keys yeniden dağılır (ideal durum)
8.2 Distributed Cache with Redis Cluster
import redis
from rediscluster import RedisCluster
class DistributedCache:
def __init__(self, startup_nodes):
# Redis Cluster: Automatic sharding
self.rc = RedisCluster(
startup_nodes=startup_nodes,
decode_responses=True,
skip_full_coverage_check=True
)
def set_user_session(self, user_id, session_data):
# Redis Cluster otomatik olarak doğru node'a yönlendirir
key = f"session:{user_id}"
self.rc.hmset(key, session_data)
self.rc.expire(key, 3600) # 1 saat
def get_user_session(self, user_id):
key = f"session:{user_id}"
return self.rc.hgetall(key)
def invalidate_user_session(self, user_id):
key = f"session:{user_id}"
self.rc.delete(key)
# Kullanım
startup_nodes = [
{'host': 'redis-node1', 'port': 7000},
{'host': 'redis-node2', 'port': 7000},
{'host': 'redis-node3', 'port': 7000},
]
cache = DistributedCache(startup_nodes)
cache.set_user_session(12345, {'name': 'Ahmet', 'cart': '3 items'})
9. Distributed Messaging (Dağıtık Mesajlaşma)
9.1 Apache Kafka ile Event Streaming
from kafka import KafkaProducer, KafkaConsumer
import json
class EventBus:
def __init__(self, bootstrap_servers):
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def publish(self, topic, event):
future = self.producer.send(topic, event)
# Senkron bekleme (opsiyonel)
result = future.get(timeout=10)
print(f"Published to {topic}: partition={result.partition}, offset={result.offset}")
return result
def create_consumer(self, topic, group_id):
consumer = KafkaConsumer(
topic,
bootstrap_servers=['localhost:9092'],
group_id=group_id,
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
auto_offset_reset='earliest',
enable_auto_commit=True
)
return consumer
# Order Service (Producer)
event_bus = EventBus(['localhost:9092'])
def order_created(order):
event = {
'type': 'ORDER_CREATED',
'order_id': order.id,
'user_id': order.user_id,
'total': order.total,
'timestamp': time.time()
}
event_bus.publish('orders', event)
# Inventory Service (Consumer)
consumer = event_bus.create_consumer('orders', 'inventory-service')
for message in consumer:
event = message.value
if event['type'] == 'ORDER_CREATED':
print(f"Stok güncelleniyor: Order {event['order_id']}")
update_inventory(event['order_id'])
9.2 RabbitMQ ile Message Queue
import pika
import json
class MessageQueue:
def __init__(self, host='localhost'):
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(host)
)
self.channel = self.connection.channel()
def declare_queue(self, queue_name, durable=True):
self.channel.queue_declare(queue=queue_name, durable=durable)
def publish(self, queue_name, message):
self.channel.basic_publish(
exchange='',
routing_key=queue_name,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # Persistent message
)
)
def consume(self, queue_name, callback):
self.channel.basic_consume(
queue=queue_name,
on_message_callback=callback,
auto_ack=False # Manual ack
)
print('Waiting for messages...')
self.channel.start_consuming()
def close(self):
self.connection.close()
# Callback fonksiyonu
def process_email(ch, method, properties, body):
message = json.loads(body)
print(f"Sending email to {message['email']}")
# Email gönderme işlemi
time.sleep(1)
ch.basic_ack(delivery_tag=method.delivery_tag)
# Kullanım
mq = MessageQueue()
mq.declare_queue('email_queue')
# Publisher
mq.publish('email_queue', {
'email': 'user@example.com',
'subject': 'Hoşgeldiniz',
'body': '...'
})
# Consumer (farklı bir process'te)
mq.consume('email_queue', process_email)
10. Distributed Tracing (Dağıtık İzleme)
from opentelemetry import trace
from opentelemetry.exporter.jaeger import JaegerSpanExporter
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
import requests
class DistributedTracer:
def __init__(self, service_name):
trace.set_tracer_provider(TracerProvider())
self.tracer = trace.get_tracer(service_name)
# Jaeger exporter
jaeger_exporter = JaegerSpanExporter(
agent_host_name='localhost',
agent_port=6831
)
span_processor = BatchSpanProcessor(jaeger_exporter)
trace.get_tracer_provider().add_span_processor(span_processor)
def trace_request(self, name):
return self.tracer.start_as_current_span(name)
def http_call_with_tracing(self, url, headers=None):
with self.tracer.start_as_current_span(f"http:{url}") as span:
# Trace context'i header'a ekle
current_span = trace.get_current_span()
ctx = trace.get_current_span().get_span_context()
if headers is None:
headers = {}
# W3C Trace Context headers
headers['traceparent'] = f"00-{ctx.trace_id}-{ctx.span_id}-01"
# HTTP çağrısı
response = requests.get(url, headers=headers)
span.set_attribute("http.status_code", response.status_code)
return response
# Service A
tracer = DistributedTracer("order-service")
with tracer.trace_request("create_order"):
# Service B'ye çağrı
response = tracer.http_call_with_tracing(
"http://payment-service/api/pay"
)
11. Gerçek Hayat Senaryosu: E-Ticaret Mikroservis Mimarisi
# complete_ecommerce_system.py
import asyncio
import json
import time
import uuid
from dataclasses import dataclass
from typing import Dict, List
import random
# ============== MİKROSERVİSLER ==============
@dataclass
class Order:
id: str
user_id: str
items: List[Dict]
total: float
status: str
class OrderService:
def __init__(self):
self.orders = {}
self.event_bus = EventBus()
self.cache = DistributedCache()
async def create_order(self, user_id, items):
# 1. Generate order ID
order_id = str(uuid.uuid4())
# 2. Calculate total
total = sum(item['price'] * item['quantity'] for item in items)
# 3. Create order
order = Order(
id=order_id,
user_id=user_id,
items=items,
total=total,
status='PENDING'
)
# 4. Cache'e yaz
self.cache.set(f"order:{order_id}", order.__dict__)
# 5. Event yayınla (Saga başlangıcı)
await self.event_bus.publish('order.created', {
'order_id': order_id,
'user_id': user_id,
'items': items,
'total': total
})
return order
class PaymentService:
def __init__(self):
self.event_bus = EventBus()
async def process_payment(self, order_id, user_id, amount):
# Simulate payment processing
await asyncio.sleep(random.uniform(0.1, 0.3))
# %90 success rate
success = random.random() < 0.9
if success:
await self.event_bus.publish('payment.succeeded', {
'order_id': order_id,
'user_id': user_id,
'amount': amount
})
else:
await self.event_bus.publish('payment.failed', {
'order_id': order_id,
'user_id': user_id,
'reason': 'Insufficient funds'
})
class InventoryService:
def __init__(self):
self.stock = {
'product1': 10,
'product2': 5,
'product3': 0
}
self.locks = {}
async def reserve_stock(self, order_id, items):
# Distributed lock ile stock'u koru
lock = RedisDistributedLock(redis_client, f"stock:{order_id}")
with lock:
for item in items:
product_id = item['product_id']
quantity = item['quantity']
if self.stock.get(product_id, 0) < quantity:
# Stok yetersiz, compensation başlat
await self.event_bus.publish('inventory.failed', {
'order_id': order_id,
'product_id': product_id,
'reason': 'Out of stock'
})
return False
self.stock[product_id] -= quantity
# Stok başarıyla ayrıldı
await self.event_bus.publish('inventory.reserved', {
'order_id': order_id,
'items': items
})
return True
# ============== EVENT BUS ==============
class EventBus:
def __init__(self):
self.subscribers = {}
def subscribe(self, event_type, handler):
if event_type not in self.subscribers:
self.subscribers[event_type] = []
self.subscribers[event_type].append(handler)
async def publish(self, event_type, data):
print(f"\n[EVENT] {event_type}: {json.dumps(data)}")
if event_type in self.subscribers:
for handler in self.subscribers[event_type]:
# Async olarak çağır
asyncio.create_task(handler(data))
# ============== SAGA ORCHESTRATOR ==============
class OrderSagaOrchestrator:
def __init__(self, order_svc, payment_svc, inventory_svc, event_bus):
self.order_svc = order_svc
self.payment_svc = payment_svc
self.inventory_svc = inventory_svc
self.event_bus = event_bus
self.saga_state = {}
# Event handlers
event_bus.subscribe('order.created', self.on_order_created)
event_bus.subscribe('inventory.reserved', self.on_inventory_reserved)
event_bus.subscribe('payment.succeeded', self.on_payment_succeeded)
event_bus.subscribe('payment.failed', self.on_payment_failed)
event_bus.subscribe('inventory.failed', self.on_inventory_failed)
async def on_order_created(self, data):
order_id = data['order_id']
self.saga_state[order_id] = {
'status': 'INVENTORY_RESERVING',
'data': data
}
# Step 1: Reserve inventory
await self.inventory_svc.reserve_stock(
order_id,
data['items']
)
async def on_inventory_reserved(self, data):
order_id = data['order_id']
self.saga_state[order_id]['status'] = 'PAYMENT_PROCESSING'
# Step 2: Process payment
await self.payment_svc.process_payment(
order_id,
self.saga_state[order_id]['data']['user_id'],
self.saga_state[order_id]['data']['total']
)
async def on_payment_succeeded(self, data):
order_id = data['order_id']
self.saga_state[order_id]['status'] = 'COMPLETED'
print(f"\n✅ Sipariş {order_id} başarıyla tamamlandı!")
# Final step: Update order status
# (in real system: update database)
async def on_payment_failed(self, data):
order_id = data['order_id']
print(f"\n❌ Sipariş {order_id} başarısız: {data['reason']}")
# COMPENSATION: Release inventory
await self.event_bus.publish('inventory.release', {
'order_id': order_id,
'reason': 'payment_failed'
})
async def on_inventory_failed(self, data):
order_id = data['order_id']
print(f"\n❌ Sipariş {order_id} başarısız: Stok yetersiz")
# No compensation needed (nothing happened yet)
self.saga_state[order_id]['status'] = 'FAILED'
# ============== LOAD BALANCER ==============
class LoadBalancer:
def __init__(self, nodes):
self.nodes = nodes
self.current = 0
def round_robin(self):
# Round-robin load balancing
node = self.nodes[self.current]
self.current = (self.current + 1) % len(self.nodes)
return node
def least_connections(self, connections):
# Least connections
return min(self.nodes, key=lambda n: connections.get(n, 0))
def weighted_random(self, weights):
# Weighted random
total = sum(weights.values())
r = random.uniform(0, total)
upto = 0
for node, weight in weights.items():
if upto + weight >= r:
return node
upto += weight
# ============== MAIN SIMULATION ==============
async def simulate_ecommerce():
print("???? Dağıtık E-Ticaret Sistemi Başlatılıyor...\n")
# Initialize services
event_bus = EventBus()
order_svc = OrderService()
payment_svc = PaymentService()
inventory_svc = InventoryService()
# Initialize saga orchestrator
saga = OrderSagaOrchestrator(
order_svc, payment_svc, inventory_svc, event_bus
)
# Simulate multiple users
users = ['user1', 'user2', 'user3', 'user4', 'user5']
products = [
{'product_id': 'product1', 'price': 100, 'quantity': 1},
{'product_id': 'product2', 'price': 50, 'quantity': 2},
{'product_id': 'product3', 'price': 200, 'quantity': 1}, # Out of stock
]
# Concurrent orders
tasks = []
for i, user in enumerate(users):
# Random product selection
items = random.sample(products, random.randint(1, 2))
task = order_svc.create_order(user, items)
tasks.append(task)
await asyncio.sleep(0.1) # Stagger orders
# Wait for all orders to complete
orders = await asyncio.gather(*tasks)
print(f"\n???? {len(orders)} sipariş oluşturuldu")
print("???? Saga'lar çalışıyor...")
# Let sagas complete
await asyncio.sleep(3)
# Final status
completed = sum(1 for o in orders if saga.saga_state.get(o.id, {}).get('status') == 'COMPLETED')
failed = len(orders) - completed
print(f"\n???? Sonuçlar:")
print(f" ✅ Başarılı: {completed}")
print(f" ❌ Başarısız: {failed}")
# Run simulation
if __name__ == "__main__":
asyncio.run(simulate_ecommerce())
12. Dağıtık Sistemlerde En İyi Pratikler
12.1 Circuit Breaker Pattern
class CircuitBreaker:
def __init__(self, failure_threshold=5, recovery_timeout=60):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.failure_count = 0
self.last_failure_time = None
self.state = 'CLOSED' # CLOSED, OPEN, HALF_OPEN
def call(self, func, *args, **kwargs):
if self.state == 'OPEN':
if time.time() - self.last_failure_time > self.recovery_timeout:
self.state = 'HALF_OPEN'
else:
raise Exception("Circuit breaker is OPEN")
try:
result = func(*args, **kwargs)
if self.state == 'HALF_OPEN':
self.state = 'CLOSED'
self.failure_count = 0
return result
except Exception as e:
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'OPEN'
raise e
# Kullanım
cb = CircuitBreaker(failure_threshold=3)
def call_payment_service():
# Risky external call
response = requests.get('http://payment-service/api/pay')
return response.json()
try:
result = cb.call(call_payment_service)
except:
result = fallback_payment_method()
12.2 Retry with Exponential Backoff
import time
import random
def retry_with_backoff(func, max_retries=5, base_delay=1):
for attempt in range(max_retries):
try:
return func()
except Exception as e:
if attempt == max_retries - 1:
raise e
# Exponential backoff with jitter
delay = base_delay * (2 ** attempt) + random.uniform(0, 1)
print(f"Attempt {attempt + 1} failed. Retrying in {delay:.2f}s...")
time.sleep(delay)
# Kullanım
def call_database():
# Risky operation
return db.query("SELECT * FROM users")
result = retry_with_backoff(call_database)
12.3 Idempotency (Tekrarlanabilirlik)
class IdempotentAPI:
def __init__(self):
self.processed_requests = {} # Redis'te olmalı
def handle_request(self, request_id, func):
# Aynı request_id daha önce işlenmiş mi?
if request_id in self.processed_requests:
print(f"Request {request_id} already processed")
return self.processed_requests[request_id]
# İlk kez işleniyor
result = func()
self.processed_requests[request_id] = result
return result
# Kullanım
api = IdempotentAPI()
# Aynı request iki kere gelirse
result1 = api.handle_request('req123', process_payment)
result2 = api.handle_request('req123', process_payment) # Aynı sonucu döndürür
13. Dağıtık Sistemlerde Test Stratejileri
13.1 Chaos Engineering (Netflix Chaos Monkey)
import random
import time
class ChaosMonkey:
def __init__(self, services, chaos_level=0.1):
self.services = services
self.chaos_level = chaos_level
def inject_failure(self):
if random.random() < self.chaos_level:
target = random.choice(self.services)
print(f"???? Chaos Monkey attacking {target}!")
failure_type = random.choice([
'kill_service',
'network_latency',
'packet_loss',
'disk_full'
])
if failure_type == 'kill_service':
self.kill_service(target)
elif failure_type == 'network_latency':
self.add_latency(target, 2000) # 2 second delay
elif failure_type == 'packet_loss':
self.drop_packets(target, 0.3) # 30% packet loss
def kill_service(self, service):
print(f"???? Killing {service}...")
# In production: Actually kill the service!
# service.stop()
def add_latency(self, service, latency_ms):
print(f"???? Adding {latency_ms}ms latency to {service}")
# Add network latency
def drop_packets(self, service, drop_rate):
print(f"???? Dropping {drop_rate*100}% packets to {service}")
# Test sırasında
chaos = ChaosMonkey(['order-service', 'payment-service', 'inventory-service'])
while True:
chaos.inject_failure()
time.sleep(10) # Every 10 seconds
13.2 Jepsen-style Tests
# Consistency tests
def test_consistency():
nodes = start_cluster(5)
# Write to leader
leader = get_leader(nodes)
leader.write('x', 1)
# Kill leader
leader.stop()
# Read from other nodes
values = []
for node in nodes[1:]:
values.append(node.read('x'))
# All nodes should eventually have 'x'=1
assert all(v == 1 for v in values), "Consistency violation!"
14. Dağıtık Sistemlerde Monitoring
from prometheus_client import Counter, Histogram, Gauge
import time
# Metrics
request_count = Counter('http_requests_total', 'Total HTTP requests')
request_duration = Histogram('http_request_duration_seconds', 'Request duration')
active_requests = Gauge('active_requests', 'Active requests')
service_health = Gauge('service_health', 'Service health (1=up, 0=down)')
class DistributedMonitoring:
def __init__(self, service_name):
self.service_name = service_name
self.heartbeat_interval = 5 # seconds
async def send_heartbeat(self):
while True:
service_health.labels(service=self.service_name).set(1)
await asyncio.sleep(self.heartbeat_interval)
@contextmanager
def monitor_request(self, endpoint):
start = time.time()
active_requests.inc()
request_count.labels(endpoint=endpoint).inc()
try:
yield
finally:
duration = time.time() - start
request_duration.labels(endpoint=endpoint).observe(duration)
active_requests.dec()
def check_health(self):
# Health check endpoint
return {
'service': self.service_name,
'status': 'healthy',
'timestamp': time.time(),
'active_requests': active_requests._value.get(),
'total_requests': request_count._value.get()
}
Özet: Dağıtık Sistemler Kontrol Listesi
Tasarım Aşamasında
- [ ] CAP teoremini anlayın: Consistency mı, Availability mi?
- [ ] Consensus algoritması seçin: Raft (anlaşılır) mı, Paxos (karmaşık) mı?
- [ ] Data partitioning stratejisi: Consistent hashing mi, range partitioning mi?
- [ ] Replication faktörü: Kaç kopya? (genelde 3)
Geliştirme Aşamasında
- [ ] Idempotency: Aynı istek iki kere gelirse ne olur?
- [ ] Retry logic: Exponential backoff + jitter
- [ ] Circuit breaker: Hata durumunda sistemi koru
- [ ] Distributed tracing: OpenTelemetry ile izle
- [ ] Health checks: Her servis health endpoint'i sunmalı
Operasyon Aşamasında
- [ ] Monitoring: Prometheus + Grafana
- [ ] Logging: Centralized logging (ELK, Loki)
- [ ] Alerting: Kritik durumlarda uyarı
- [ ] Chaos engineering: Düzenli olarak hata enjekte et
- [ ] Backup & Recovery: Düzenli yedekleme ve felaket kurtarma planı
Yaygın Hatalar
- ❌ Network'ün güvenilir olduğunu varsaymak
- ❌ Clock synchronization'a güvenmek (NTP bile yetmez)
- ❌ Split-brain senaryolarını düşünmemek
- ❌ Distributed transaction'ları 2PC ile çözmeye çalışmak
- ❌ Monitoring'i sonraya bırakmak
Unutmayın: Dağıtık sistemlerde her şey hata verebilir. Network, disk, CPU, memory, time... Tasarımınızı en kötü senaryoya göre yapın ve her şeyin hata verebileceğini varsayın.